You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/04 01:23:34 UTC

[curator] branch CURATOR-549-zk36-persistent-watcher-bridge updated (ecab5d5 -> 3043edf)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git.


 discard ecab5d5  CURATOR-549
     new 3043edf  CURATOR-549

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ecab5d5)
            \
             N -- N -- N   refs/heads/CURATOR-549-zk36-persistent-watcher-bridge (3043edf)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/curator/framework/recipes/cache/CuratorCacheStorage.java | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)


[curator] 01/01: CURATOR-549

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 3043edf005406b56951b43e591e9cc5d79496566
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 17:53:02 2019 -0500

    CURATOR-549
    
    Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested.
---
 .../cache/CompatibleCuratorCacheBridge.java        | 113 +++++++++++++++++
 .../recipes/cache/CuratorCacheBridge.java          |  33 ++---
 .../recipes/cache/CuratorCacheBuilder.java         |   7 ++
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  19 +++
 .../framework/recipes/cache/CuratorCacheImpl.java  |   2 +-
 .../recipes/cache/CuratorCacheStorage.java         |   6 +-
 .../cache/PathChildrenCacheListenerWrapper.java    |  11 +-
 .../curator/framework/recipes/cache/TreeCache.java |  36 ++++--
 curator-test-zk35/pom.xml                          |  46 +++++++
 curator-x-async/pom.xml                            |  15 +++
 .../x/async/modeled/details/ModeledCacheImpl.java  |  46 +++++--
 .../async/modeled/TestCachedModeledFramework.java  |   2 +
 .../x/async/modeled/TestModeledFrameworkBase.java  |   4 +-
 curator-x-discovery/pom.xml                        |  15 +++
 .../apache/curator/x/discovery/ServiceCache.java   |  17 ++-
 .../x/discovery/details/ServiceCacheImpl.java      | 133 +++++++++++----------
 .../curator/x/discovery/TestServiceCache.java      |   2 +
 pom.xml                                            |  14 +++
 18 files changed, 413 insertions(+), 108 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..929c815
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener
+{
+    private final TreeCache cache;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+
+    CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, Executor executor)
+    {
+        Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        TreeCache.Builder builder = TreeCache.newBuilder(client, path);
+        if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+        {
+            builder.setDataIsCompressed(true);
+        }
+        if ( executor != null )
+        {
+            builder.setExecutor(executor);
+        }
+        cache = builder.build();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData()));
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null));
+                break;
+            }
+
+            case NODE_UPDATED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData()));
+                break;
+            }
+
+            case INITIALIZED:
+            {
+                listenerManager.forEach(CuratorCacheListener::initialized);
+                break;
+            }
+        }
+    }
+}
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
similarity index 54%
copy from curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
copy to curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
index a122d69..7103877 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -16,29 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.x.discovery;
+package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.x.discovery.details.InstanceProvider;
-import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
-import java.util.List;
 
-public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T>
+/**
+ * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+public interface CuratorCacheBridge extends Closeable
 {
     /**
-     * Return the current list of instances. NOTE: there is no guarantee of freshness. This is
-     * merely the last known list of instances. However, the list is updated via a ZooKeeper watcher
-     * so it should be fresh within a window of a second or two.
-     *
-     * @return the list
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
      */
-    public List<ServiceInstance<T>> getInstances();
+    @Override
+    void close();
 
     /**
-     * The cache must be started before use
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
      *
-     * @throws Exception errors
+     * @return listener container
      */
-    public void start() throws Exception;
+    Listenable<CuratorCacheListener> listenable();
 }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
index 35a5f26..ab80a6f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -60,4 +60,11 @@ public interface CuratorCacheBuilder
      * @return new Curator Cache
      */
     CuratorCache build();
+
+    /**
+     * Return a new bridge cache based on the builder methods that have been called.
+     *
+     * @return new bridge cache
+     */
+    CuratorCacheBridge buildBridge();
 }
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index 9f9e03d..6445187 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
@@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
     @Override
     public CuratorCache build()
     {
+        return internalBuild();
+    }
+
+    @Override
+    public CuratorCacheBridge buildBridge()
+    {
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            return internalBuild();
+        }
+        Preconditions.checkArgument(exceptionHandler == null, "ExceptionHandler is not supported by the TreeCache bridge");
+        Preconditions.checkArgument(storage == null, "Custom CuratorCacheStorage is not supported by the TreeCache bridge");
+        return new CompatibleCuratorCacheBridge(client, path, options, executor);
+    }
+
+    private CuratorCacheImpl internalBuild()
+    {
         return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
     }
 }
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 8916399..1e62a39 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -44,7 +44,7 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Ty
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index f3d870d..e809263 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -34,7 +34,8 @@ public interface CuratorCacheStorage
      *
      * @return storage instance
      */
-    static CuratorCacheStorage standard() {
+    static CuratorCacheStorage standard()
+    {
         return new StandardCuratorCacheStorage(true);
     }
 
@@ -44,7 +45,8 @@ public interface CuratorCacheStorage
      *
      * @return storage instance that does not retain data bytes
      */
-    static CuratorCacheStorage bytesNotCached() {
+    static CuratorCacheStorage bytesNotCached()
+    {
         return new StandardCuratorCacheStorage(false);
     }
 
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
index a9123c1..7e9730c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
         {
             case NODE_CREATED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data));
                 break;
             }
 
             case NODE_CHANGED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data));
                 break;
             }
 
             case NODE_DELETED:
             {
-                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData));
                 break;
             }
         }
@@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
     @Override
     public void initialized()
     {
-        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+        sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null));
     }
 
-    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    private void sendEvent(PathChildrenCacheEvent event)
     {
-        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
         try
         {
             listener.childEvent(client, event);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..4badb30 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -52,6 +52,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -75,6 +76,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+    private final Executor executor;
     private final boolean createParentNodes;
     private final boolean disableZkWatches;
     private final TreeCacheSelector selector;
@@ -86,6 +88,7 @@ public class TreeCache implements Closeable
         private boolean cacheData = true;
         private boolean dataIsCompressed = false;
         private ExecutorService executorService = null;
+        private Executor executor = null;
         private int maxDepth = Integer.MAX_VALUE;
         private boolean createParentNodes = false;
         private boolean disableZkWatches = false;
@@ -102,12 +105,12 @@ public class TreeCache implements Closeable
          */
         public TreeCache build()
         {
-            ExecutorService executor = executorService;
-            if ( executor == null )
+            ExecutorService localExecutorService = executorService;
+            if ( (localExecutorService == null) && (executor == null) )
             {
-                executor = Executors.newSingleThreadExecutor(defaultThreadFactory);
+                localExecutorService = Executors.newSingleThreadExecutor(defaultThreadFactory);
             }
-            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector);
+            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, selector);
         }
 
         /**
@@ -146,6 +149,15 @@ public class TreeCache implements Closeable
         }
 
         /**
+         * Sets the executor to publish events; a default executor will be created if not specified.
+         */
+        public Builder setExecutor(Executor executor)
+        {
+            this.executor = checkNotNull(executor);
+            return this;
+        }
+
+        /**
          * Sets the maximum depth to explore/watch.  A {@code maxDepth} of {@code 0} will watch only
          * the root node (like {@link NodeCache}); a {@code maxDepth} of {@code 1} will watch the
          * root node and its immediate children (kind of like {@link PathChildrenCache}.
@@ -561,7 +573,7 @@ public class TreeCache implements Closeable
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector());
+        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, new DefaultTreeCacheSelector());
     }
 
     /**
@@ -570,12 +582,14 @@ public class TreeCache implements Closeable
      * @param cacheData        if true, node contents are cached in addition to the stat
      * @param dataIsCompressed if true, data in the path is compressed
      * @param executorService  Closeable ExecutorService to use for the TreeCache's background thread
+     * @param executor          executor to use for the TreeCache's background thread
      * @param createParentNodes true to create parent nodes as containers
      * @param disableZkWatches true to disable Zookeeper watches
      * @param selector         the selector to use
      */
-    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
+    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, final Executor executor, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
     {
+        this.executor = executor;
         this.createParentNodes = createParentNodes;
         this.selector = Preconditions.checkNotNull(selector, "selector cannot be null");
         this.root = new TreeNode(validatePath(path), null);
@@ -585,7 +599,7 @@ public class TreeCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
-        this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
+        this.executorService = executorService;
     }
 
     /**
@@ -620,7 +634,10 @@ public class TreeCache implements Closeable
             client.removeWatchers();
             client.getConnectionStateListenable().removeListener(connectionStateListener);
             listeners.clear();
-            executorService.shutdown();
+            if ( executorService != null )
+            {
+                executorService.shutdown();
+            }
             try
             {
                 root.wasDeleted();
@@ -854,8 +871,9 @@ public class TreeCache implements Closeable
     {
         if ( treeState.get() != TreeState.CLOSED )
         {
+            Executor localExecutor = (executorService != null) ? executorService : executor;
             LOG.debug("publishEvent: {}", event);
-            executorService.submit(new Runnable()
+            localExecutor.execute(new Runnable()
             {
                 @Override
                 public void run()
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 5803893..4ca8356 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -89,6 +89,18 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <exclusions>
                 <exclusion>
@@ -114,6 +126,32 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
             <scope>test</scope>
@@ -124,6 +162,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -151,6 +195,8 @@
                     <dependenciesToScan>
                         <dependency>org.apache.curator:curator-framework</dependency>
                         <dependency>org.apache.curator:curator-recipes</dependency>
+                        <dependency>org.apache.curator:curator-x-async</dependency>
+                        <dependency>org.apache.curator:curator-x-discovery</dependency>
                     </dependenciesToScan>
                     <groups>zk35,zk35Compatibility</groups>
                     <excludedGroups>zk36</excludedGroups>
diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 5ffd774..a32dbd3 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -49,4 +49,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..d79d66c 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,34 +19,42 @@
 package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
     private final ZPath basePath;
+    private final CuratorFramework client;
+    private final Set<CreateOption> options;
 
     private static final class Entry<T>
     {
@@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
     {
+        this.client = client;
         if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() )
         {
             modelSpec = modelSpec.parent(); // i.e. the last item is a parameter
@@ -69,19 +78,35 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
-            .build();
+        options = modelSpec.createOptions();
+        CuratorCacheBuilder builder = CuratorCache.builder(client, basePath.fullPath());
+        if ( modelSpec.createOptions().contains(CreateOption.compress) )
+        {
+            builder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+        }
+        cache = builder.withExecutor(executor).buildBridge();
     }
 
     public void start()
     {
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forTreeCache(client, this)
+            .build();
         try
         {
-            cache.getListenable().addListener(this);
+            if ( options.contains(CreateOption.createParentsIfNeeded) )
+            {
+                if ( options.contains(CreateOption.createParentsAsContainers) )
+                {
+                    new EnsureContainers(client, basePath.fullPath()).ensure();
+                }
+                else
+                {
+                    ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), basePath.fullPath(), false, null, false);
+                }
+            }
+
+            cache.listenable().addListener(listener);
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +117,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..830ee2b 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestCachedModeledFramework extends TestModeledFrameworkBase
 {
     @Test
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
index 61a4570..5660539 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
     protected ModelSpec<TestNewerModel> newModelSpec;
     protected AsyncCuratorFramework async;
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
         newModelSpec = ModelSpec.builder(path, newSerializer).build();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index 824231d..50b2dea 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -80,4 +80,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..ddae689 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T>
 {
@@ -33,12 +34,22 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe
      *
      * @return the list
      */
-    public List<ServiceInstance<T>> getInstances();
+    List<ServiceInstance<T>> getInstances();
 
     /**
-     * The cache must be started before use
+     * The cache must be started before use. This method blocks while the internal
+     * cache is loaded.
      *
      * @throws Exception errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The cache must be started before use. This version returns immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    CountDownLatch startImmediate() throws Exception;
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..c108713 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,18 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCacheStorage;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +50,17 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final CuratorCacheBridge cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
+    private String path;
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -77,8 +82,10 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
-        cache.getListenable().addListener(this);
+        path = discovery.pathForName(name);
+        cache = CuratorCache.builder(discovery.getClient(), path).withExecutor(executorService::submit).withStorage(CuratorCacheStorage.bytesNotCached()).buildBridge();
+        CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache(discovery.getClient(), this).forInitialized(this::initialized).build();
+        cache.listenable().addListener(listener);
     }
 
     @Override
@@ -94,9 +101,17 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void start() throws Exception
     {
+        startImmediate().await();
+    }
+
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        cache.start(true);
+        new EnsureContainers(discovery.getClient(), path).ensure();
+
+        cache.start();
         if ( debugStartLatch != null )
         {
             debugStartLatch.countDown();
@@ -108,14 +123,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
             debugStartWaitLatch = null;
         }
 
-        for ( ChildData childData : cache.getCurrentData() )
-        {
-            if ( childData.getData() != null )  // else already processed by the cache listener
-            {
-                addInstance(childData, true);
-            }
-        }
-        discovery.cacheOpened(this);
+        return initializedLatch;
     }
 
     @Override
@@ -123,18 +131,15 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+        {
+            @Override
+            public Void apply(ServiceCacheListener listener)
+            {
+                discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                return null;
+            }
+        });
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -166,39 +171,42 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            if ( !event.getData().getPath().equals(path) )
             {
-                addInstance(event.getData(), false);
+                addInstance(event.getData());
                 notifyListeners = true;
-                break;
             }
+            break;
+        }
 
-            case CHILD_REMOVED:
+        case CHILD_REMOVED:
+        {
+            if ( !event.getData().getPath().equals(path) )
             {
                 instances.remove(instanceIdFromData(event.getData()));
                 notifyListeners = true;
-                break;
             }
+            break;
+        }
         }
 
-        if ( notifyListeners )
+        if ( notifyListeners && (initializedLatch.getCount() == 0) )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
+            listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheListener listener)
                 {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
+                    listener.cacheChanged();
+                    return null;
                 }
-            );
+            });
         }
     }
 
@@ -207,18 +215,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
+    private void addInstance(ChildData childData)
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
+        try
         {
-            instances.putIfAbsent(instanceId, serviceInstance);
+            String instanceId = instanceIdFromData(childData);
+            ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+            instances.put(instanceId, serviceInstance);
         }
-        else
+        catch ( Exception e )
         {
-            instances.put(instanceId, serviceInstance);
+            throw new RuntimeException(e);
         }
-        cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
+    }
+
+    private void initialized()
+    {
+        discovery.cacheOpened(this);
+        initializedLatch.countDown();
     }
 }
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..2a9d2d8 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
diff --git a/pom.xml b/pom.xml
index e223de6..00083c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,6 +385,13 @@
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery</artifactId>
                 <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-discovery</artifactId>
+                <version>${project.version}</version>
             </dependency>
 
             <dependency>
@@ -400,6 +407,13 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-async</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-math</artifactId>
                 <version>${commons-math-version}</version>