You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/03/22 22:03:06 UTC

[curator] branch CURATOR-549-zk36-persistent-watcher-bridge updated (038b588 -> d1d7d12)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git.


 discard 038b588  CURATOR-549
 discard f8dfdb3  CURATOR-549
     add 523998e  CURATOR-549
     new d1d7d12  CURATOR-549

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (038b588)
            \
             N -- N -- N   refs/heads/CURATOR-549-zk36-persistent-watcher-bridge (d1d7d12)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[curator] 01/01: CURATOR-549

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit d1d7d12872b3f46ccd94b58394785c0d349aedce
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Mar 20 14:48:18 2020 -0500

    CURATOR-549
    
    Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested.
---
 .../cache/CompatibleCuratorCacheBridge.java        | 131 +++++++++++++++++++++
 .../framework/recipes/cache/CuratorCache.java      |  15 +++
 .../recipes/cache/CuratorCacheBridge.java          |  58 +++++++++
 .../recipes/cache/CuratorCacheBridgeBuilder.java   |  57 +++++++++
 .../cache/CuratorCacheBridgeBuilderImpl.java       |  76 ++++++++++++
 .../framework/recipes/cache/CuratorCacheImpl.java  |   9 +-
 .../framework/recipes/nodes/GroupMember.java       |  28 ++---
 .../framework/recipes/nodes/TestGroupMember.java   |   2 +
 .../x/async/modeled/details/ModeledCacheImpl.java  |  65 +++++-----
 .../async/modeled/TestCachedModeledFramework.java  |   2 +
 .../apache/curator/x/discovery/ServiceCache.java   |   3 +
 .../curator/x/discovery/ServiceCacheBuilder.java   |  10 --
 .../x/discovery/ServiceProviderBuilder.java        |  24 ++--
 .../discovery/details/ServiceCacheBuilderImpl.java |  24 +---
 .../x/discovery/details/ServiceCacheImpl.java      |  83 +++++++------
 .../x/discovery/details/ServiceDiscoveryImpl.java  |  66 +++++------
 .../details/ServiceProviderBuilderImpl.java        |   9 +-
 .../x/discovery/details/ServiceProviderImpl.java   |  24 +---
 .../x/discovery/ServiceCacheLeakTester.java        |   3 +
 .../curator/x/discovery/TestServiceCache.java      |   8 ++
 .../x/discovery/details/TestServiceCacheRace.java  |   2 +
 .../x/discovery/details/TestServiceDiscovery.java  |   2 +
 .../details/TestServiceDiscoveryBuilder.java       |   6 +-
 .../x/discovery/details/TestServiceProvider.java   |   2 +
 src/site/confluence/breaking-changes.confluence    |   3 +
 25 files changed, 527 insertions(+), 185 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..5dd71fd
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Stream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+/**
+ * Version of CuratorCacheBridge for pre-ZK 3.6 - uses TreeCache instead of CuratorCache
+ */
+@SuppressWarnings("deprecation")
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener
+{
+    private final TreeCache cache;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+    private final String path;
+
+    CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, ExecutorService executorService, boolean cacheData)
+    {
+        this.path = path;
+        Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(cacheData);
+        if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+        {
+            builder.setDataIsCompressed(true);
+        }
+        if ( executorService != null )
+        {
+            builder.setExecutor(executorService);
+        }
+        cache = builder.build();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public Stream<ChildData> streamRootChildren()
+    {
+        Map<String, ChildData> currentChildren = cache.getCurrentChildren(path);
+        if ( currentChildren == null )
+        {
+            return Stream.empty();
+        }
+        return currentChildren.values().stream();
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData()));
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null));
+                break;
+            }
+
+            case NODE_UPDATED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData()));
+                break;
+            }
+
+            case INITIALIZED:
+            {
+                listenerManager.forEach(CuratorCacheListener::initialized);
+                break;
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
index ebe06e0..706f094 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -87,6 +87,21 @@ public interface CuratorCache extends Closeable
     }
 
     /**
+     * Start a Curator Cache Bridge builder. A Curator Cache Bridge is
+     * a facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
+     * persistent watches are available or {@link org.apache.curator.framework.recipes.cache.TreeCache}
+     * otherwise (i.e. if you are using ZooKeeper 3.5.x).
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @return bridge builder
+     */
+    static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, String path)
+    {
+        return new CuratorCacheBridgeBuilderImpl(client, path);
+    }
+
+    /**
      * Start the cache. This will cause a complete refresh from the cache's root node and generate
      * events for all nodes found, etc.
      */
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..e7d8868
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.stream.Stream;
+
+/**
+ * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheBridge extends Closeable
+{
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+
+    /**
+     * Return a stream over the storage entries that are the immediate children of the root node.
+     *
+     * @return stream over root entries
+     */
+    Stream<ChildData> streamRootChildren();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
new file mode 100644
index 0000000..4a1920f
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.ExecutorService;
+
+public interface CuratorCacheBridgeBuilder
+{
+    /**
+     * @param options any options
+     * @return this
+     */
+    CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options);
+
+    /**
+     * The bridge cache will not retain the data bytes. i.e. ChildData objects
+     * returned by the cache will always return {@code null} for {@link ChildData#getData()}
+     *
+     * @return this
+     */
+    CuratorCacheBridgeBuilder withBytesNotCached();
+
+    /**
+     * If the old {@link org.apache.curator.framework.recipes.cache.TreeCache} is used by the bridge
+     * (i.e. you are using ZooKeeper 3.5.x) then this executor service is passed to {@link org.apache.curator.framework.recipes.cache.TreeCache.Builder#setExecutor(java.util.concurrent.ExecutorService)}.
+     * For {@link org.apache.curator.framework.recipes.cache.CuratorCache} this is not used and will be ignored (a warning will be logged).
+     *
+     * @param executorService executor to use for ZooKeeper 3.5.x
+     * @return this
+     */
+    @SuppressWarnings("deprecation")
+    CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorService);
+
+    /**
+     * Return a new Curator Cache Bridge based on the builder methods that have been called
+     *
+     * @return new Curator Cache Bridge
+     */
+    CuratorCacheBridge build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
new file mode 100644
index 0000000..7aebe59
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+
+class CuratorCacheBridgeBuilderImpl implements CuratorCacheBridgeBuilder
+{
+    private final CuratorFramework client;
+    private final String path;
+    private CuratorCache.Options[] options;
+    private boolean cacheData = true;
+    private ExecutorService executorService = null;
+
+    CuratorCacheBridgeBuilderImpl(CuratorFramework client, String path)
+    {
+        this.client = client;
+        this.path = path;
+    }
+
+    @Override
+    public CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options)
+    {
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBridgeBuilder withBytesNotCached()
+    {
+        cacheData = false;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorService)
+    {
+        this.executorService = executorService;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBridge build()
+    {
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            if ( executorService != null )
+            {
+                LoggerFactory.getLogger(getClass()).warn("CuratorCache does not support custom ExecutorService");
+            }
+            CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+            return new CuratorCacheImpl(client, storage, path, options, null);
+        }
+        return new CompatibleCuratorCacheBridge(client, path, options, executorService, cacheData);
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 780ec6e..2096049 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -39,12 +39,13 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
@@ -110,6 +111,12 @@ class CuratorCacheImpl implements CuratorCache
     }
 
     @Override
+    public Stream<ChildData> streamRootChildren()
+    {
+        return storage.streamImmediateChildren(path);
+    }
+
+    @Override
     public Listenable<CuratorCacheListener> listenable()
     {
         return listenerManager;
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
index 8cd1f65..954b78d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
@@ -24,11 +24,14 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import java.io.Closeable;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -37,8 +40,8 @@ import java.util.Map;
  */
 public class GroupMember implements Closeable
 {
-    private final PersistentEphemeralNode pen;
-    private final PathChildrenCache cache;
+    private final PersistentNode pen;
+    private final CuratorCacheBridge cache;
     private final String thisId;
 
     /**
@@ -61,8 +64,8 @@ public class GroupMember implements Closeable
     {
         this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null");
 
-        cache = newPathChildrenCache(client, membershipPath);
-        pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload);
+        cache = CuratorCache.bridgeBuilder(client, membershipPath).build();
+        pen = new PersistentNode(client, CreateMode.EPHEMERAL, false, ZKPaths.makePath(membershipPath, thisId), payload);
     }
 
     /**
@@ -121,8 +124,11 @@ public class GroupMember implements Closeable
     {
         ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
         boolean thisIdAdded = false;
-        for ( ChildData data : cache.getCurrentData() )
+
+        Iterator<ChildData> iterator = cache.streamRootChildren().iterator();
+        while ( iterator.hasNext() )
         {
+            ChildData data = iterator.next();
             String id = idFromPath(data.getPath());
             thisIdAdded = thisIdAdded || id.equals(thisId);
             builder.put(id, data.getData());
@@ -144,14 +150,4 @@ public class GroupMember implements Closeable
     {
         return ZKPaths.getNodeFromPath(path);
     }
-
-    protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
-    {
-        return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload);
-    }
-
-    protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath)
-    {
-        return new PathChildrenCache(client, membershipPath, true);
-    }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
index 2da051f..4395b36 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
@@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Map;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestGroupMember extends BaseClassForTests
 {
     // NOTE - don't need many tests as this class is just a wrapper around two existing recipes
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 5c015f4..e324ac4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,9 +19,13 @@
 package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.StandardListenerManager;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridgeBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
@@ -42,11 +46,12 @@ import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final StandardListenerManager<ModeledCacheListener<T>> listenerContainer = StandardListenerManager.standard();
     private final ZPath basePath;
+    private final EnsureContainers ensureContainers;
 
     private static final class Entry<T>
     {
@@ -69,19 +74,32 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
-            .build();
+        CuratorCacheBridgeBuilder bridgeBuilder = CuratorCache.bridgeBuilder(client, basePath.fullPath()).withBytesNotCached().withExecutorService(executor);
+        if ( modelSpec.createOptions().contains(CreateOption.compress) )
+        {
+            bridgeBuilder = bridgeBuilder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+        }
+        cache = bridgeBuilder.build();
+        cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client, this).build());
+
+        if ( modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers) )
+        {
+            ensureContainers = new EnsureContainers(client, basePath.fullPath());
+        }
+        else
+        {
+            ensureContainers = null;
+        }
     }
 
     public void start()
     {
         try
         {
-            cache.getListenable().addListener(this);
+            if ( ensureContainers != null )
+            {
+                ensureContainers.ensure();
+            }
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +110,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
@@ -148,7 +165,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         }
     }
 
-    private void internalChildEvent(TreeCacheEvent event) throws Exception
+    private void internalChildEvent(TreeCacheEvent event)
     {
         switch ( event.getType() )
         {
@@ -156,16 +173,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         case NODE_UPDATED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
+            byte[] bytes = event.getData().getData();
+            if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
             {
-                byte[] bytes = event.getData().getData();
-                if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
-                {
-                    T model = serializer.deserialize(bytes);
-                    entries.put(path, new Entry<>(event.getData().getStat(), model));
-                    ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
-                    accept(type, path, event.getData().getStat(), model);
-                }
+                T model = serializer.deserialize(bytes);
+                entries.put(path, new Entry<>(event.getData().getStat(), model));
+                ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+                accept(type, path, event.getData().getStat(), model);
             }
             break;
         }
@@ -173,13 +187,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         case NODE_REMOVED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
-            {
-                Entry<T> entry = entries.remove(path);
-                T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
-                Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
-                accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
-            }
+            Entry<T> entry = entries.remove(path);
+            T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
+            Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+            accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
             break;
         }
 
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..70f9c66 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestCachedModeledFramework extends TestModeledFrameworkBase
 {
     @Test
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..d409f99 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T>
 {
@@ -41,4 +42,6 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe
      * @throws Exception errors
      */
     public void start() throws Exception;
+
+    CountDownLatch startImmediate() throws Exception;
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 326a16c..cfec2c4 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.discovery;
 
-import org.apache.curator.utils.CloseableExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
@@ -59,13 +58,4 @@ public interface ServiceCacheBuilder<T>
      * @return this
      */
     public ServiceCacheBuilder<T> executorService(ExecutorService executorService);
-
-    /**
-     * Optional CloseableExecutorService to use for the cache's background thread. The specified ExecutorService
-     * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder.
-     *
-     * @param executorService an instance of CloseableExecutorService
-     * @return this
-     */
-    public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService);
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
index 02948a3..4c0544d 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.discovery;
 
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -30,7 +29,7 @@ public interface ServiceProviderBuilder<T>
      *
      * @return provider
      */
-    public ServiceProvider<T> build();
+    ServiceProvider<T> build();
 
     /**
      * required - set the name of the service to be provided
@@ -38,7 +37,7 @@ public interface ServiceProviderBuilder<T>
      * @param serviceName the name of the service
      * @return this
      */
-    public ServiceProviderBuilder<T> serviceName(String serviceName);
+    ServiceProviderBuilder<T> serviceName(String serviceName);
 
     /**
      * optional - set the provider strategy. The default is {@link RoundRobinStrategy}
@@ -46,7 +45,7 @@ public interface ServiceProviderBuilder<T>
      * @param providerStrategy strategy to use
      * @return this
      */
-    public ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> providerStrategy);
+    ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> providerStrategy);
 
     /**
      * optional - the thread factory to use for creating internal threads. The specified ThreadFactory overrides
@@ -57,7 +56,7 @@ public interface ServiceProviderBuilder<T>
      * @deprecated use {@link #executorService(ExecutorService)} instead
      */
     @Deprecated
-    public ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory);
+    ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory);
 
     /**
      * Set the down instance policy
@@ -65,7 +64,7 @@ public interface ServiceProviderBuilder<T>
      * @param downInstancePolicy new policy
      * @return this
      */
-    public ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy downInstancePolicy);
+    ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy downInstancePolicy);
 
     /**
      * Add an instance filter. NOTE: this does not remove previously added filters. i.e.
@@ -75,7 +74,7 @@ public interface ServiceProviderBuilder<T>
      * @param filter filter to add
      * @return this
      */
-    public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> filter);
+    ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> filter);
 
     /**
      * Optional ExecutorService to use for the cache's background thread. The specified ExecutorService
@@ -85,14 +84,5 @@ public interface ServiceProviderBuilder<T>
      * @param executorService executor service
      * @return this
      */
-    public ServiceProviderBuilder<T> executorService(ExecutorService executorService);
-
-    /**
-     * Optional CloseableExecutorService to use for the cache's background thread. The specified CloseableExecutorService
-     * overrides any prior ThreadFactory or CloseableExecutorService set on the ServiceProviderBuilder.
-     *
-     * @param executorService an instance of CloseableExecutorService
-     * @return this
-     */
-    public ServiceProviderBuilder<T> executorService(CloseableExecutorService executorService);
+    ServiceProviderBuilder<T> executorService(ExecutorService executorService);
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 7647c0f..cf7f468 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.discovery.details;
 
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceCacheBuilder;
 import java.util.concurrent.ExecutorService;
@@ -32,7 +31,7 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
     private ServiceDiscoveryImpl<T> discovery;
     private String name;
     private ThreadFactory threadFactory;
-    private CloseableExecutorService executorService;
+    private ExecutorService executorService;
 
     ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery)
     {
@@ -47,13 +46,13 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
     @Override
     public ServiceCache<T> build()
     {
-        if (executorService != null)
+        if (threadFactory != null)
         {
-            return new ServiceCacheImpl<T>(discovery, name, executorService);
+            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
         }
         else
         {
-            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+            return new ServiceCacheImpl<T>(discovery, name, executorService);
         }
     }
 
@@ -92,20 +91,9 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
      * @return this
      */
     @Override
-    public ServiceCacheBuilder<T> executorService(ExecutorService executorService) {
-        return executorService(new CloseableExecutorService(executorService, false));
-    }
-
-    /**
-     * Optional CloseableExecutorService to use for the cache's background thread
-     *
-     * @param executorService an instance of CloseableExecutorService
-     * @return this
-     */
-    @Override
-    public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService) {
+    public ServiceCacheBuilder<T> executorService(ExecutorService executorService)
+    {
         this.executorService = executorService;
-        this.threadFactory = null;
         return this;
     }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index c154836..f0316ac 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -20,26 +20,27 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
@@ -48,19 +49,21 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
 {
     private final StandardListenerManager<ServiceCacheListener> listenerContainer = StandardListenerManager.standard();
     private final ServiceDiscoveryImpl<T> discovery;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache cache;
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final CuratorCacheBridge cache;
     private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+    private final EnsureContainers ensureContainers;
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
 
     private enum State
     {
         LATENT, STARTED, STOPPED
     }
 
-    private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
+    private static ExecutorService convertThreadFactory(ThreadFactory threadFactory)
     {
         Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
-        return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true);
+        return Executors.newSingleThreadExecutor(threadFactory);
     }
 
     ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory)
@@ -68,16 +71,24 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         this(discovery, name, convertThreadFactory(threadFactory));
     }
 
-    ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, CloseableExecutorService executorService)
+    ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ExecutorService executorService)
     {
         Preconditions.checkNotNull(discovery, "discovery cannot be null");
         Preconditions.checkNotNull(name, "name cannot be null");
-        Preconditions.checkNotNull(executorService, "executorService cannot be null");
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
-        cache.getListenable().addListener(this);
+        String path = discovery.pathForName(name);
+        cache = CuratorCache.bridgeBuilder(discovery.getClient(), path)
+            .withExecutorService(executorService)
+            .withBytesNotCached()
+            .build();
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forPathChildrenCache(discovery.getClient(), this)
+            .forInitialized(this::initialized)
+            .build();
+        cache.listenable().addListener(listener);
+        ensureContainers = new EnsureContainers(discovery.getClient(), path);
     }
 
     @Override
@@ -93,11 +104,19 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void start() throws Exception
     {
+        startImmediate().await();
+    }
+
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        cache.start(true);
+        ensureContainers.ensure();
+        cache.start();
         if ( debugStartLatch != null )
         {
+            initializedLatch.await();
             debugStartLatch.countDown();
             debugStartLatch = null;
         }
@@ -107,18 +126,11 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
             debugStartWaitLatch = null;
         }
 
-        for ( ChildData childData : cache.getCurrentData() )
-        {
-            if ( childData.getData() != null )  // else already processed by the cache listener
-            {
-                addInstance(childData, true);
-            }
-        }
-        discovery.cacheOpened(this);
+        return initializedLatch;
     }
 
     @Override
-    public void close() throws IOException
+    public void close()
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
@@ -152,7 +164,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     }
 
     @Override
-    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
     {
         boolean notifyListeners = false;
         switch ( event.getType() )
@@ -160,7 +172,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         case CHILD_ADDED:
         case CHILD_UPDATED:
         {
-            addInstance(event.getData(), false);
+            addInstance(event.getData());
             notifyListeners = true;
             break;
         }
@@ -173,7 +185,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         }
         }
 
-        if ( notifyListeners )
+        if ( notifyListeners && (initializedLatch.getCount() == 0) )
         {
             listenerContainer.forEach(ServiceCacheListener::cacheChanged);
         }
@@ -184,18 +196,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
+    private void addInstance(ChildData childData)
     {
-        String instanceId = instanceIdFromData(childData);
-        ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
+        try
         {
-            instances.putIfAbsent(instanceId, serviceInstance);
+            String instanceId = instanceIdFromData(childData);
+            ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+            instances.put(instanceId, serviceInstance);
         }
-        else
+        catch ( Exception e )
         {
-            instances.put(instanceId, serviceInstance);
+            throw new RuntimeException(e);
         }
-        cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
+    }
+
+    private void initialized()
+    {
+        discovery.cacheOpened(this);
+        initializedLatch.countDown();
     }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 476705c..c072213 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -26,8 +26,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
@@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private static class Entry<T>
     {
         private volatile ServiceInstance<T> service;
-        private volatile NodeCache cache;
+        private volatile CuratorCacheBridge cache;
 
         private Entry(ServiceInstance<T> service)
         {
@@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public ServiceCacheBuilder<T> serviceCacheBuilder()
     {
-        return new ServiceCacheBuilderImpl<T>(this)
-            .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
+        return new ServiceCacheBuilderImpl<T>(this);
     }
 
     /**
@@ -458,52 +458,48 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         }
     }
 
-    private NodeCache makeNodeCache(final ServiceInstance<T> instance)
+    private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance)
     {
         if ( !watchInstances )
         {
             return null;
         }
 
-        final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
-        try
-        {
-            nodeCache.start(true);
-        }
-        catch ( InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            return null;
-        }
-        catch ( Exception e )
-        {
-            log.error("Could not start node cache for: " + instance, e);
-        }
-        NodeCacheListener listener = new NodeCacheListener()
-        {
-            @Override
-            public void nodeChanged() throws Exception
-            {
-                if ( nodeCache.getCurrentData() != null )
+        CuratorCacheBridge cache = CuratorCache.bridgeBuilder(client, pathForInstance(instance.getName(), instance.getId()))
+            .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)
+            .withBytesNotCached()
+            .build();
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .afterInitialized()
+            .forAll((__, ___, data) -> {
+                if ( data != null )
                 {
-                    ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
-                    Entry<T> entry = services.get(newInstance.getId());
-                    if ( entry != null )
+                    try
                     {
-                        synchronized(entry)
+                        ServiceInstance<T> newInstance = serializer.deserialize(data.getData());
+                        Entry<T> entry = services.get(newInstance.getId());
+                        if ( entry != null )
                         {
-                            entry.service = newInstance;
+                            synchronized(entry)
+                            {
+                                entry.service = newInstance;
+                            }
                         }
                     }
+                    catch ( Exception e )
+                    {
+                        log.debug("Could not deserialize: " + data.getPath());
+                    }
                 }
                 else
                 {
                     log.warn("Instance data has been deleted for: " + instance);
                 }
-            }
-        };
-        nodeCache.getListenable().addListener(listener);
-        return nodeCache;
+            })
+            .build();
+        cache.listenable().addListener(listener);
+        cache.start();
+        return cache;
     }
 
     private void internalUnregisterService(final Entry<T> entry) throws Exception
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
index e36700b..2f0bbb2 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.DownInstancePolicy;
 import org.apache.curator.x.discovery.InstanceFilter;
 import org.apache.curator.x.discovery.ProviderStrategy;
@@ -39,7 +38,7 @@ class ServiceProviderBuilderImpl<T> implements ServiceProviderBuilder<T>
     private String serviceName;
     private ProviderStrategy<T> providerStrategy;
     private ThreadFactory threadFactory;
-    private CloseableExecutorService executorService;
+    private ExecutorService executorService;
     private List<InstanceFilter<T>> filters = Lists.newArrayList();
     private DownInstancePolicy downInstancePolicy = new DownInstancePolicy();
 
@@ -111,12 +110,6 @@ class ServiceProviderBuilderImpl<T> implements ServiceProviderBuilder<T>
     @Override
     public ServiceProviderBuilder<T> executorService(ExecutorService executorService)
     {
-        return executorService(new CloseableExecutorService(executorService));
-    }
-
-    @Override
-    public ServiceProviderBuilder<T> executorService(CloseableExecutorService executorService)
-    {
         this.executorService = executorService;
         this.threadFactory = null;
         return this;
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index d9787e4..45b4b9f 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.DownInstancePolicy;
 import org.apache.curator.x.discovery.InstanceFilter;
 import org.apache.curator.x.discovery.ProviderStrategy;
@@ -51,38 +50,27 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T>
         this(discovery, serviceName, providerStrategy, threadFactory, null, filters, downInstancePolicy);
     }
 
-    public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName, ProviderStrategy<T> providerStrategy, CloseableExecutorService executorService, List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy)
-    {
-        this(discovery, serviceName, providerStrategy, null, executorService, filters, downInstancePolicy);
-    }
-
-    protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, CloseableExecutorService executorService, List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy)
+    protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, ExecutorService executorService, List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy)
     {
         this.discovery = discovery;
         this.providerStrategy = providerStrategy;
 
-        downInstanceManager = new DownInstanceManager<T>(downInstancePolicy);
-        final ServiceCacheBuilder builder = discovery.serviceCacheBuilder().name(serviceName);
+        downInstanceManager = new DownInstanceManager<>(downInstancePolicy);
+        final ServiceCacheBuilder<T> builder = discovery.serviceCacheBuilder().name(serviceName);
         if (executorService != null)
         {
             builder.executorService(executorService);
         } else
         {
+            //noinspection deprecation
             builder.threadFactory(threadFactory);
         }
         cache = builder.build();
 
         ArrayList<InstanceFilter<T>> localFilters = Lists.newArrayList(filters);
         localFilters.add(downInstanceManager);
-        localFilters.add(new InstanceFilter<T>()
-        {
-            @Override
-            public boolean apply(ServiceInstance<T> instance)
-            {
-                return instance.isEnabled();
-            }
-        });
-        instanceProvider = new FilteredInstanceProvider<T>(cache, localFilters);
+        localFilters.add(ServiceInstance::isEnabled);
+        instanceProvider = new FilteredInstanceProvider<>(cache, localFilters);
     }
 
     /**
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
index 590c55c..3d357f6 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
@@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.strategies.RandomStrategy;
+import org.testng.annotations.Test;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class ServiceCacheLeakTester
 {
     public static void main(String[] args) throws Exception
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..6d24586 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,7 +27,9 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -40,6 +42,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
@@ -261,6 +264,11 @@ public class TestServiceCache extends BaseClassForTests
     @Test
     public void testExecutorServiceIsInvoked() throws Exception
     {
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            return; // for ZK 3.6 the underlying cache ignores the executor
+        }
+
         List<Closeable> closeables = Lists.newArrayList();
         try
         {
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
index 06d63b9..9604c19 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceCacheRace extends BaseClassForTests
 {
     private final Timing timing = new Timing();
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index a1c8cfe..7a23e16 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -38,6 +39,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceDiscovery extends BaseClassForTests
 {
     private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
index 312c884..fd0c438 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
@@ -22,15 +22,17 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceDiscoveryBuilder extends BaseClassForTests
 {
     @Test
-    public void testDefaultSerializer() throws Exception
+    public void testDefaultSerializer()
     {        
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         ServiceDiscoveryBuilder<Object> builder = ServiceDiscoveryBuilder.builder(Object.class).client(client);
@@ -41,7 +43,7 @@ public class TestServiceDiscoveryBuilder extends BaseClassForTests
     }
 
     @Test
-    public void testSetSerializer() throws Exception
+    public void testSetSerializer()
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         ServiceDiscoveryBuilder<Object> builder = ServiceDiscoveryBuilder.builder(Object.class).client(client);
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
index d7358fe..b743e07 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -36,6 +37,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceProvider extends BaseClassForTests
 {
 
diff --git a/src/site/confluence/breaking-changes.confluence b/src/site/confluence/breaking-changes.confluence
index 3170a16..af0dc04 100644
--- a/src/site/confluence/breaking-changes.confluence
+++ b/src/site/confluence/breaking-changes.confluence
@@ -8,3 +8,6 @@ need to use Curator with ZooKeeper 3.4.x you will need to use a previous version
 * Exhibitor support has been removed.
 * {{ConnectionHandlingPolicy}} and related classes have been removed.
 * The {{Reaper}} and {{ChildReaper}} classes/recipes have been removed. You should use ZooKeeper container nodes instead.
+* {{newPersistentEphemeralNode(}} and {{newPathChildrenCache}} were removed from {{GroupMember}}
+* {{ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService)} was removed from {{ServiceCacheBuilder}}
+* {{ServiceProviderBuilder<T> executorService(CloseableExecutorService executorService);)} was removed from {{ServiceProviderBuilder}}