You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/08 03:32:36 UTC

[4/6] curator git commit: Squashed commit of the following:

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
new file mode 100644
index 0000000..8acbebb
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.cached;
+
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+import java.io.Closeable;
+import java.util.List;
+
+public interface CachedModeledFramework<T> extends ModeledFramework<T>, Closeable
+{
+    /**
+     * Return the cache instance
+     *
+     * @return cache
+     */
+    ModeledCache<T> cache();
+
+    /**
+     * Returns a view of this instance that uses the CachedModeledFramework's executor
+     * for all default async completion operations. i.e. when you use, for example,
+     * {@link java.util.concurrent.CompletionStage#handleAsync(java.util.function.BiFunction)}
+     * this instance's executor is used instead of <code>ForkJoinPool.commonPool()</code>.
+     *
+     * @return view
+     */
+    CachedModeledFramework<T> asyncDefault();
+
+    /**
+     * Start the internally created cache
+     */
+    void start();
+
+    /**
+     * Close/stop the internally created cache
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the listener container so that you can add/remove listeners
+     *
+     * @return listener container
+     */
+    Listenable<ModeledCacheListener<T>> listenable();
+
+    /**
+     * Same as {@link org.apache.curator.x.async.modeled.ModeledFramework#childrenAsZNodes()}
+     * but always reads from cache - i.e. no additional queries to ZooKeeper are made
+     *
+     * @return AsyncStage stage
+     */
+    @Override
+    AsyncStage<List<ZNode<T>>> childrenAsZNodes();
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    CachedModeledFramework<T> child(Object child);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    CachedModeledFramework<T> withPath(ZPath path);
+
+    /**
+     * Same as {@link #read()} except that if the cache does not have a value
+     * for this path a direct query is made.
+     *
+     * @return AsyncStage
+     * @see org.apache.curator.x.async.AsyncStage
+     */
+    AsyncStage<T> readThrough();
+
+    /**
+     * Same as {@link #read(org.apache.zookeeper.data.Stat)} except that if the cache does not have a value
+     * for this path a direct query is made.
+     *
+     * @param storingStatIn the stat for the new ZNode is stored here
+     * @return AsyncStage
+     * @see org.apache.curator.x.async.AsyncStage
+     */
+    AsyncStage<T> readThrough(Stat storingStatIn);
+
+    /**
+     * Same as {@link #readAsZNode()} except that if the cache does not have a value
+     * for this path a direct query is made.
+     *
+     *
+     * @return AsyncStage
+     * @see org.apache.curator.x.async.AsyncStage
+     */
+    AsyncStage<ZNode<T>> readThroughAsZNode();
+
+    /**
+     * Return the instances of the base path of this cached framework
+     *
+     * @return listing of all models in the base path
+     */
+    AsyncStage<List<T>> list();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
new file mode 100644
index 0000000..6677268
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.cached;
+
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.Map;
+import java.util.Optional;
+
+public interface ModeledCache<T>
+{
+    /**
+     * Return the modeled current data for the given path. There are no guarantees of accuracy. This is
+     * merely the most recent view of the data. If there is no node at the given path,
+     * {@link java.util.Optional#empty()} is returned.
+     *
+     * @param path path to the node to check
+     * @return data if the node is alive, or empty
+     */
+    Optional<ZNode<T>> currentData(ZPath path);
+
+    /**
+     * Return the modeled current set of children at the given path, mapped by child name. There are no
+     * guarantees of accuracy; this is merely the most recent view of the data.
+     *
+     * @param path path to the node to check
+     * @return a possibly-empty map of children if the node is alive
+     */
+    Map<ZPath, ZNode<T>> currentChildren(ZPath path);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
new file mode 100644
index 0000000..42498c0
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.cached;
+
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.LoggerFactory;
+
+@FunctionalInterface
+public interface ModeledCacheListener<T>
+{
+    enum Type
+    {
+        /**
+         * A child was added to the path
+         */
+        NODE_ADDED,
+
+        /**
+         * A child's data was changed
+         */
+        NODE_UPDATED,
+
+        /**
+         * A child was removed from the path
+         */
+        NODE_REMOVED
+    }
+
+    /**
+     * The given path was added, updated or removed
+     *
+     * @param type action type
+     * @param path the path
+     * @param stat the node's stat (previous stat for removal)
+     * @param model the node's model (previous model for removal)
+     */
+    void accept(Type type, ZPath path, Stat stat, T model);
+
+    /**
+     * The cache has finished initializing
+     */
+    default void initialized()
+    {
+        // NOP
+    }
+
+    /**
+     * Called when there is an exception processing a message from the internal cache. This is most
+     * likely due to a de-serialization problem.
+     *
+     * @param e the exception
+     */
+    default void handleException(Exception e)
+    {
+        LoggerFactory.getLogger(getClass()).error("Could not process cache message", e);
+    }
+
+    /**
+     * Returns a version of this listener that only begins calling
+     * {@link #accept(org.apache.curator.x.async.modeled.cached.ModeledCacheListener.Type, org.apache.curator.x.async.modeled.ZPath, org.apache.zookeeper.data.Stat, Object)}
+     * once {@link #initialized()} has been called. i.e. changes that occur as the cache is initializing are not sent
+     * to the listener
+     *
+     * @return wrapped listener
+     */
+    default ModeledCacheListener<T> postInitializedOnly()
+    {
+        return new ModeledCacheListener<T>()
+        {
+            private volatile boolean isInitialized = false;
+
+            @Override
+            public void accept(Type type, ZPath path, Stat stat, T model)
+            {
+                if ( isInitialized )
+                {
+                    ModeledCacheListener.this.accept(type, path, stat, model);
+                }
+            }
+
+            @Override
+            public void initialized()
+            {
+                isInitialized = true;
+                ModeledCacheListener.this.initialized();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
new file mode 100644
index 0000000..2a7fd5f
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.cached.ModeledCache;
+import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
+import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
+{
+    private final ModeledFramework<T> client;
+    private final ModeledCacheImpl<T> cache;
+    private final Executor executor;
+    private final boolean asyncDefaultMode;
+
+    CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
+    {
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor, false);
+    }
+
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor, boolean asyncDefaultMode)
+    {
+        this.client = client;
+        this.cache = cache;
+        this.executor = executor;
+        this.asyncDefaultMode = asyncDefaultMode;
+    }
+
+    @Override
+    public ModeledCache<T> cache()
+    {
+        return cache;
+    }
+
+    @Override
+    public CachedModeledFramework<T> asyncDefault()
+    {
+        return new CachedModeledFrameworkImpl<>(client, cache, executor, true);
+    }
+
+    @Override
+    public void start()
+    {
+        cache.start();
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<ModeledCacheListener<T>> listenable()
+    {
+        return cache.listenable();
+    }
+
+    @Override
+    public CachedModeledFramework<T> cached()
+    {
+        throw new UnsupportedOperationException("Already a cached instance");
+    }
+
+    @Override
+    public CachedModeledFramework<T> cached(ExecutorService executor)
+    {
+        throw new UnsupportedOperationException("Already a cached instance");
+    }
+
+    @Override
+    public VersionedModeledFramework<T> versioned()
+    {
+        return new VersionedModeledFrameworkImpl<>(this);
+    }
+
+    @Override
+    public AsyncCuratorFramework unwrap()
+    {
+        return client.unwrap();
+    }
+
+    @Override
+    public ModelSpec<T> modelSpec()
+    {
+        return client.modelSpec();
+    }
+
+    @Override
+    public CachedModeledFramework<T> child(Object child)
+    {
+        return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor, asyncDefaultMode);
+    }
+
+    @Override
+    public ModeledFramework<T> parent()
+    {
+        throw new UnsupportedOperationException("Not supported for CachedModeledFramework. Instead, call parent() on the ModeledFramework before calling cached()");
+    }
+
+    @Override
+    public CachedModeledFramework<T> withPath(ZPath path)
+    {
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor, asyncDefaultMode);
+    }
+
+    @Override
+    public AsyncStage<String> set(T model)
+    {
+        return client.set(model);
+    }
+
+    @Override
+    public AsyncStage<String> set(T model, Stat storingStatIn)
+    {
+        return client.set(model, storingStatIn);
+    }
+
+    @Override
+    public AsyncStage<String> set(T model, Stat storingStatIn, int version)
+    {
+        return client.set(model, storingStatIn, version);
+    }
+
+    @Override
+    public AsyncStage<String> set(T model, int version)
+    {
+        return client.set(model, version);
+    }
+
+    @Override
+    public AsyncStage<T> read()
+    {
+        return internalRead(ZNode::model, this::exceptionally);
+    }
+
+    @Override
+    public AsyncStage<T> read(Stat storingStatIn)
+    {
+        return internalRead(n -> {
+            if ( storingStatIn != null )
+            {
+                DataTree.copyStat(n.stat(), storingStatIn);
+            }
+            return n.model();
+        }, this::exceptionally);
+    }
+
+    @Override
+    public AsyncStage<ZNode<T>> readAsZNode()
+    {
+        return internalRead(Function.identity(), this::exceptionally);
+    }
+
+    @Override
+    public AsyncStage<T> readThrough()
+    {
+        return internalRead(ZNode::model, client::read);
+    }
+
+    @Override
+    public AsyncStage<T> readThrough(Stat storingStatIn)
+    {
+        return internalRead(ZNode::model, () -> client.read(storingStatIn));
+    }
+
+    @Override
+    public AsyncStage<ZNode<T>> readThroughAsZNode()
+    {
+        return internalRead(Function.identity(), client::readAsZNode);
+    }
+
+    @Override
+    public AsyncStage<List<T>> list()
+    {
+        List<T> children = cache.currentChildren()
+            .values()
+            .stream()
+            .map(ZNode::model)
+            .collect(Collectors.toList());
+        return asyncDefaultMode ? ModelStage.asyncCompleted(children, executor) : ModelStage.completed(children);
+    }
+
+    @Override
+    public AsyncStage<Stat> update(T model)
+    {
+        return client.update(model);
+    }
+
+    @Override
+    public AsyncStage<Stat> update(T model, int version)
+    {
+        return client.update(model, version);
+    }
+
+    @Override
+    public AsyncStage<Void> delete()
+    {
+        return client.delete();
+    }
+
+    @Override
+    public AsyncStage<Void> delete(int version)
+    {
+        return client.delete(version);
+    }
+
+    @Override
+    public AsyncStage<Stat> checkExists()
+    {
+        ZPath path = client.modelSpec().path();
+        Optional<ZNode<T>> data = cache.currentData(path);
+        return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null));
+    }
+
+    @Override
+    public AsyncStage<List<ZPath>> children()
+    {
+        List<ZPath> paths = cache.currentChildren(client.modelSpec().path())
+            .keySet()
+            .stream()
+            .filter(path -> path.equals(cache.basePath()))
+            .collect(Collectors.toList());
+        return completed(paths);
+    }
+
+    @Override
+    public AsyncStage<List<ZNode<T>>> childrenAsZNodes()
+    {
+        List<ZNode<T>> nodes = cache.currentChildren(client.modelSpec().path())
+            .entrySet()
+            .stream()
+            .filter(e -> e.getKey().equals(cache.basePath()))
+            .map(Map.Entry::getValue)
+            .collect(Collectors.toList());
+        return completed(nodes);
+    }
+
+    @Override
+    public CuratorOp createOp(T model)
+    {
+        return client.createOp(model);
+    }
+
+    @Override
+    public CuratorOp updateOp(T model)
+    {
+        return client.updateOp(model);
+    }
+
+    @Override
+    public CuratorOp updateOp(T model, int version)
+    {
+        return client.updateOp(model, version);
+    }
+
+    @Override
+    public CuratorOp deleteOp()
+    {
+        return client.deleteOp();
+    }
+
+    @Override
+    public CuratorOp deleteOp(int version)
+    {
+        return client.deleteOp(version);
+    }
+
+    @Override
+    public CuratorOp checkExistsOp()
+    {
+        return client.checkExistsOp();
+    }
+
+    @Override
+    public CuratorOp checkExistsOp(int version)
+    {
+        return client.checkExistsOp(version);
+    }
+
+    @Override
+    public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations)
+    {
+        return client.inTransaction(operations);
+    }
+
+    private <U> AsyncStage<U> completed(U value)
+    {
+        return asyncDefaultMode ? ModelStage.asyncCompleted(value, executor) : ModelStage.completed(value);
+    }
+
+    private <U> AsyncStage<U> exceptionally()
+    {
+        KeeperException.NoNodeException exception = new KeeperException.NoNodeException(client.modelSpec().path().fullPath());
+        return asyncDefaultMode ? ModelStage.asyncExceptionally(exception, executor) : ModelStage.exceptionally(exception);
+    }
+
+    private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, Supplier<AsyncStage<U>> elseProc)
+    {
+        ZPath path = client.modelSpec().path();
+        Optional<ZNode<T>> data = cache.currentData(path);
+        return data.map(node -> completed(resolver.apply(node)))
+            .orElseGet(elseProc);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java
new file mode 100644
index 0000000..58405eb
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.schema.Schema;
+import org.apache.curator.framework.schema.SchemaValidator;
+import org.apache.curator.framework.schema.SchemaViolation;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.api.DeleteOption;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator
+{
+    private final ZPath path;
+    private final ModelSerializer<T> serializer;
+    private final CreateMode createMode;
+    private final List<ACL> aclList;
+    private final Set<CreateOption> createOptions;
+    private final Set<DeleteOption> deleteOptions;
+    private final long ttl;
+    private volatile Schema schema = null;
+
+    public ModelSpecImpl(ZPath path, ModelSerializer<T> serializer, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions, long ttl)
+    {
+        this.path = Objects.requireNonNull(path, "path cannot be null");
+        this.serializer = Objects.requireNonNull(serializer, "serializer cannot be null");
+        this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+        this.aclList = ImmutableList.copyOf(Objects.requireNonNull(aclList, "aclList cannot be null"));
+        this.createOptions = ImmutableSet.copyOf(Objects.requireNonNull(createOptions, "createOptions cannot be null"));
+        this.deleteOptions = ImmutableSet.copyOf(Objects.requireNonNull(deleteOptions, "deleteOptions cannot be null"));
+        this.ttl = ttl;
+    }
+
+    @Override
+    public ModelSpec<T> child(Object child)
+    {
+        return withPath(path.child(child));
+    }
+
+    @Override
+    public ModelSpec<T> parent()
+    {
+        return withPath(path.parent());
+    }
+
+    @Override
+    public ModelSpec<T> resolved(Object... parameters)
+    {
+        return withPath(path.resolved(parameters));
+    }
+
+    @Override
+    public ModelSpec<T> resolved(List<Object> parameters)
+    {
+        return withPath(path.resolved(parameters));
+    }
+
+    @Override
+    public ModelSpec<T> withPath(ZPath newPath)
+    {
+        return new ModelSpecImpl<>(newPath, serializer, createMode, aclList, createOptions, deleteOptions, ttl);
+    }
+
+    @Override
+    public ZPath path()
+    {
+        return path;
+    }
+
+    @Override
+    public ModelSerializer<T> serializer()
+    {
+        return serializer;
+    }
+
+    @Override
+    public CreateMode createMode()
+    {
+        return createMode;
+    }
+
+    @Override
+    public List<ACL> aclList()
+    {
+        return aclList;
+    }
+
+    @Override
+    public Set<CreateOption> createOptions()
+    {
+        return createOptions;
+    }
+
+    @Override
+    public Set<DeleteOption> deleteOptions()
+    {
+        return deleteOptions;
+    }
+
+    @Override
+    public long ttl()
+    {
+        return ttl;
+    }
+
+    @Override
+    public Schema schema()
+    {
+        if ( schema == null )
+        {
+            schema = Schema.builder(path.toSchemaPathPattern())
+                .dataValidator(this)
+                .ephemeral(createMode.isEphemeral() ? Schema.Allowance.MUST : Schema.Allowance.CANNOT)
+                .canBeDeleted(true)
+                .sequential(createMode.isSequential() ? Schema.Allowance.MUST : Schema.Allowance.CANNOT)
+                .watched(Schema.Allowance.CAN)
+                .build();
+        }
+        return schema;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( this == o )
+        {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() )
+        {
+            return false;
+        }
+
+        ModelSpecImpl<?> modelSpec = (ModelSpecImpl<?>)o;
+
+        if ( ttl != modelSpec.ttl )
+        {
+            return false;
+        }
+        if ( !path.equals(modelSpec.path) )
+        {
+            return false;
+        }
+        if ( !serializer.equals(modelSpec.serializer) )
+        {
+            return false;
+        }
+        if ( createMode != modelSpec.createMode )
+        {
+            return false;
+        }
+        if ( !aclList.equals(modelSpec.aclList) )
+        {
+            return false;
+        }
+        if ( !createOptions.equals(modelSpec.createOptions) )
+        {
+            return false;
+        }
+        //noinspection SimplifiableIfStatement
+        if ( !deleteOptions.equals(modelSpec.deleteOptions) )
+        {
+            return false;
+        }
+        return schema.equals(modelSpec.schema);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = path.hashCode();
+        result = 31 * result + serializer.hashCode();
+        result = 31 * result + createMode.hashCode();
+        result = 31 * result + aclList.hashCode();
+        result = 31 * result + createOptions.hashCode();
+        result = 31 * result + deleteOptions.hashCode();
+        result = 31 * result + (int)(ttl ^ (ttl >>> 32));
+        result = 31 * result + schema.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "ModelSpecImpl{" + "path=" + path + ", serializer=" + serializer + ", createMode=" + createMode + ", aclList=" + aclList + ", createOptions=" + createOptions + ", deleteOptions=" + deleteOptions + ", ttl=" + ttl + ", schema=" + schema + '}';
+    }
+
+    @Override
+    public boolean isValid(Schema schema, String path, byte[] data, List<ACL> acl)
+    {
+        if ( acl != null )
+        {
+            List<ACL> localAclList = (aclList.size() > 0) ? aclList : ZooDefs.Ids.OPEN_ACL_UNSAFE;
+            if ( !acl.equals(localAclList) )
+            {
+                throw new SchemaViolation(schema, new SchemaViolation.ViolatorData(path, data, acl), "ACLs do not match model ACLs");
+            }
+        }
+
+        if ( data != null )
+        {
+            try
+            {
+                serializer.deserialize(data);
+            }
+            catch ( RuntimeException e )
+            {
+                throw new SchemaViolation(schema, new SchemaViolation.ViolatorData(path, data, acl), "Data cannot be deserialized into a model");
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
new file mode 100644
index 0000000..27047ec
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T>
+{
+    private final CompletionStage<WatchedEvent> event;
+
+    static <U> ModelStage<U> make()
+    {
+        return new ModelStage<>(null);
+    }
+
+    static <U> ModelStage<U> make(CompletionStage<WatchedEvent> event)
+    {
+        return new ModelStage<>(event);
+    }
+
+    static <U> ModelStage<U> completed(U value)
+    {
+        ModelStage<U> stage = new ModelStage<>(null);
+        stage.complete(value);
+        return stage;
+    }
+
+    static <U> ModelStage<U> exceptionally(Exception e)
+    {
+        ModelStage<U> stage = new ModelStage<>(null);
+        stage.completeExceptionally(e);
+        return stage;
+    }
+
+    static <U> ModelStage<U> async(Executor executor)
+    {
+        return new AsyncModelStage<>(executor);
+    }
+
+    static <U> ModelStage<U> asyncCompleted(U value, Executor executor)
+    {
+        ModelStage<U> stage = new AsyncModelStage<>(executor);
+        stage.complete(value);
+        return stage;
+    }
+
+    static <U> ModelStage<U> asyncExceptionally(Exception e, Executor executor)
+    {
+        ModelStage<U> stage = new AsyncModelStage<>(executor);
+        stage.completeExceptionally(e);
+        return stage;
+    }
+
+    @Override
+    public CompletionStage<WatchedEvent> event()
+    {
+        return event;
+    }
+
+    private ModelStage(CompletionStage<WatchedEvent> event)
+    {
+        this.event = event;
+    }
+
+    private static class AsyncModelStage<U> extends ModelStage<U>
+    {
+        private final Executor executor;
+
+        public AsyncModelStage(Executor executor)
+        {
+            super(null);
+            this.executor = executor;
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> thenApplyAsync(Function<? super U, ? extends U1> fn)
+        {
+            return super.thenApplyAsync(fn, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> thenAcceptAsync(Consumer<? super U> action)
+        {
+            return super.thenAcceptAsync(action, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> thenRunAsync(Runnable action)
+        {
+            return super.thenRunAsync(action, executor);
+        }
+
+        @Override
+        public <U1, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U1> other, BiFunction<? super U, ? super U1, ? extends V> fn)
+        {
+            return super.thenCombineAsync(other, fn, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U1> other, BiConsumer<? super U, ? super U1> action)
+        {
+            return super.thenAcceptBothAsync(other, action, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
+        {
+            return super.runAfterBothAsync(other, action, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> applyToEitherAsync(CompletionStage<? extends U> other, Function<? super U, U1> fn)
+        {
+            return super.applyToEitherAsync(other, fn, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends U> other, Consumer<? super U> action)
+        {
+            return super.acceptEitherAsync(other, action, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
+        {
+            return super.runAfterEitherAsync(other, action, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> thenComposeAsync(Function<? super U, ? extends CompletionStage<U1>> fn)
+        {
+            return super.thenComposeAsync(fn, executor);
+        }
+
+        @Override
+        public CompletableFuture<U> whenCompleteAsync(BiConsumer<? super U, ? super Throwable> action)
+        {
+            return super.whenCompleteAsync(action, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> handleAsync(BiFunction<? super U, Throwable, ? extends U1> fn)
+        {
+            return super.handleAsync(fn, executor);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
new file mode 100644
index 0000000..72e6762
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.cached.ModeledCache;
+import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.zookeeper.data.Stat;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
+{
+    private final TreeCache cache;
+    private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
+    private final ModelSerializer<T> serializer;
+    private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
+    private final ZPath basePath;
+
+    private static final class Entry<T>
+    {
+        final Stat stat;
+        final T model;
+
+        Entry(Stat stat, T model)
+        {
+            this.stat = stat;
+            this.model = model;
+        }
+    }
+
+    ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
+    {
+        if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() )
+        {
+            modelSpec = modelSpec.parent(); // i.e. the last item is a parameter
+        }
+
+        basePath = modelSpec.path();
+        this.serializer = modelSpec.serializer();
+        cache = TreeCache.newBuilder(client, basePath.fullPath())
+            .setCacheData(false)
+            .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
+            .setExecutor(executor)
+            .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
+            .build();
+    }
+
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void close()
+    {
+        cache.getListenable().removeListener(this);
+        cache.close();
+        entries.clear();
+    }
+
+    @Override
+    public Optional<ZNode<T>> currentData(ZPath path)
+    {
+        Entry<T> entry = entries.remove(path);
+        if ( entry != null )
+        {
+            return Optional.of(new ZNodeImpl<>(path, entry.stat, entry.model));
+        }
+        return Optional.empty();
+    }
+
+    ZPath basePath()
+    {
+        return basePath;
+    }
+
+    Map<ZPath, ZNode<T>> currentChildren()
+    {
+        return currentChildren(basePath);
+    }
+
+    @Override
+    public Map<ZPath, ZNode<T>> currentChildren(ZPath path)
+    {
+        return entries.entrySet()
+            .stream()
+            .filter(entry -> entry.getKey().startsWith(path))
+            .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new ZNodeImpl<>(entry.getKey(), entry.getValue().stat, entry.getValue().model)))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    public Listenable<ModeledCacheListener<T>> listenable()
+    {
+        return listenerContainer;
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event)
+    {
+        try
+        {
+            internalChildEvent(event);
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+
+            listenerContainer.forEach(l -> {
+                l.handleException(e);
+                return null;
+            });
+        }
+    }
+
+    private void internalChildEvent(TreeCacheEvent event) throws Exception
+    {
+        switch ( event.getType() )
+        {
+        case NODE_ADDED:
+        case NODE_UPDATED:
+        {
+            ZPath path = ZPath.parse(event.getData().getPath());
+            if ( !path.equals(basePath) )
+            {
+                byte[] bytes = event.getData().getData();
+                if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
+                {
+                    T model = serializer.deserialize(bytes);
+                    entries.put(path, new Entry<>(event.getData().getStat(), model));
+                    ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+                    accept(type, path, event.getData().getStat(), model);
+                }
+            }
+            break;
+        }
+
+        case NODE_REMOVED:
+        {
+            ZPath path = ZPath.parse(event.getData().getPath());
+            if ( !path.equals(basePath) )
+            {
+                Entry<T> entry = entries.remove(path);
+                T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
+                Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+                accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
+            }
+            break;
+        }
+
+        case INITIALIZED:
+        {
+            listenerContainer.forEach(l -> {
+                l.initialized();
+                return null;
+            });
+            break;
+        }
+
+        default:
+            // ignore
+            break;
+        }
+    }
+
+    private void accept(ModeledCacheListener.Type type, ZPath path, Stat stat, T model)
+    {
+        listenerContainer.forEach(l -> {
+            l.accept(type, path, stat, model);
+            return null;
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
new file mode 100644
index 0000000..c1d19c4
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.WatchMode;
+import org.apache.curator.x.async.api.AsyncCuratorFrameworkDsl;
+import org.apache.curator.x.async.api.AsyncPathAndBytesable;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncTransactionSetDataBuilder;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
+{
+    private final AsyncCuratorFramework client;
+    private final WatchableAsyncCuratorFramework watchableClient;
+    private final ModelSpec<T> modelSpec;
+    private final WatchMode watchMode;
+    private final UnaryOperator<WatchedEvent> watcherFilter;
+    private final UnhandledErrorListener unhandledErrorListener;
+    private final UnaryOperator<CuratorEvent> resultFilter;
+    private final AsyncCuratorFrameworkDsl dslClient;
+    private final boolean isWatched;
+
+    public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter)
+    {
+        boolean isWatched = (watchMode != null);
+
+        Objects.requireNonNull(client, "client cannot be null");
+        Objects.requireNonNull(model, "model cannot be null");
+
+        watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess;
+
+        AsyncCuratorFrameworkDsl dslClient = client.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter);
+        WatchableAsyncCuratorFramework watchableClient = isWatched ? dslClient.watched() : dslClient;
+
+        return new ModeledFrameworkImpl<>(
+            client,
+            dslClient,
+            watchableClient,
+            model,
+            watchMode,
+            watcherFilter,
+            unhandledErrorListener,
+            resultFilter,
+            isWatched
+        );
+    }
+
+    private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched)
+    {
+        this.client = client;
+        this.dslClient = dslClient;
+        this.watchableClient = watchableClient;
+        this.modelSpec = modelSpec;
+        this.watchMode = watchMode;
+        this.watcherFilter = watcherFilter;
+        this.unhandledErrorListener = unhandledErrorListener;
+        this.resultFilter = resultFilter;
+        this.isWatched = isWatched;
+    }
+
+    @Override
+    public CachedModeledFramework<T> cached()
+    {
+        return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+    }
+
+    @Override
+    public CachedModeledFramework<T> cached(ExecutorService executor)
+    {
+        Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers.");
+        return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null"));
+    }
+
+    @Override
+    public VersionedModeledFramework<T> versioned()
+    {
+        return new VersionedModeledFrameworkImpl<>(this);
+    }
+
+    @Override
+    public ModelSpec<T> modelSpec()
+    {
+        return modelSpec;
+    }
+
+    @Override
+    public AsyncCuratorFramework unwrap()
+    {
+        return client;
+    }
+
+    @Override
+    public AsyncStage<String> set(T item)
+    {
+        return set(item, null, -1);
+    }
+
+    @Override
+    public AsyncStage<String> set(T item, Stat storingStatIn)
+    {
+        return set(item, storingStatIn, -1);
+    }
+
+    @Override
+    public AsyncStage<String> set(T item, int version)
+    {
+        return set(item, null, -1);
+    }
+
+    @Override
+    public AsyncStage<String> set(T item, Stat storingStatIn, int version)
+    {
+        try
+        {
+            byte[] bytes = modelSpec.serializer().serialize(item);
+            return dslClient.create()
+                .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl(), version)
+                .forPath(resolveForSet(item), bytes);
+        }
+        catch ( Exception e )
+        {
+            return ModelStage.exceptionally(e);
+        }
+    }
+
+    @Override
+    public AsyncStage<T> read()
+    {
+        return internalRead(ZNode::model, null);
+    }
+
+    @Override
+    public AsyncStage<T> read(Stat storingStatIn)
+    {
+        return internalRead(ZNode::model, storingStatIn);
+    }
+
+    @Override
+    public AsyncStage<ZNode<T>> readAsZNode()
+    {
+        return internalRead(Function.identity(), null);
+    }
+
+    @Override
+    public AsyncStage<Stat> update(T item)
+    {
+        return update(item, -1);
+    }
+
+    @Override
+    public AsyncStage<Stat> update(T item, int version)
+    {
+        try
+        {
+            byte[] bytes = modelSpec.serializer().serialize(item);
+            AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData();
+            return next.forPath(resolveForSet(item), bytes);
+        }
+        catch ( Exception e )
+        {
+            return ModelStage.exceptionally(e);
+        }
+    }
+
+    @Override
+    public AsyncStage<Stat> checkExists()
+    {
+        return watchableClient.checkExists().forPath(modelSpec.path().fullPath());
+    }
+
+    @Override
+    public AsyncStage<Void> delete()
+    {
+        return delete(-1);
+    }
+
+    @Override
+    public AsyncStage<Void> delete(int version)
+    {
+        return dslClient.delete().withVersion(-1).forPath(modelSpec.path().fullPath());
+    }
+
+    @Override
+    public AsyncStage<List<ZPath>> children()
+    {
+        return internalGetChildren(modelSpec.path());
+    }
+
+    @Override
+    public AsyncStage<List<ZNode<T>>> childrenAsZNodes()
+    {
+        ModelStage<List<ZNode<T>>> modelStage = ModelStage.make();
+        Preconditions.checkState(!isWatched, "childrenAsZNodes() cannot be used with watched instances.");
+        children().handle((children, e) -> {
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                completeChildrenAsZNodes(modelStage, children);
+            }
+            return null;
+        });
+        return modelStage;
+    }
+
+    private void completeChildrenAsZNodes(ModelStage<List<ZNode<T>>> modelStage, List<ZPath> children)
+    {
+        List<ZNode<T>> nodes = Lists.newArrayList();
+        if ( children.size() == 0 )
+        {
+            modelStage.complete(nodes);
+            return;
+        }
+        children.forEach(path -> withPath(path).readAsZNode().handle((node, e) -> {
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                nodes.add(node);
+                if ( nodes.size() == children.size() )
+                {
+                    modelStage.complete(nodes);
+                }
+            }
+            return null;
+        }));
+    }
+
+    private AsyncStage<List<ZPath>> internalGetChildren(ZPath path)
+    {
+        AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(path.fullPath());
+        ModelStage<List<ZPath>> modelStage = ModelStage.make(asyncStage.event());
+        asyncStage.whenComplete((children, e) -> {
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                modelStage.complete(children.stream().map(path::child).collect(Collectors.toList()));
+            }
+        });
+        return modelStage;
+    }
+
+    @Override
+    public ModeledFramework<T> parent()
+    {
+        ModelSpec<T> newModelSpec = modelSpec.parent();
+        return new ModeledFrameworkImpl<>(
+            client,
+            dslClient,
+            watchableClient,
+            newModelSpec,
+            watchMode,
+            watcherFilter,
+            unhandledErrorListener,
+            resultFilter,
+            isWatched
+        );
+    }
+
+    @Override
+    public ModeledFramework<T> child(Object child)
+    {
+        ModelSpec<T> newModelSpec = modelSpec.child(child);
+        return new ModeledFrameworkImpl<>(
+            client,
+            dslClient,
+            watchableClient,
+            newModelSpec,
+            watchMode,
+            watcherFilter,
+            unhandledErrorListener,
+            resultFilter,
+            isWatched
+        );
+    }
+
+    @Override
+    public ModeledFramework<T> withPath(ZPath path)
+    {
+        ModelSpec<T> newModelSpec = modelSpec.withPath(path);
+        return new ModeledFrameworkImpl<>(
+            client,
+            dslClient,
+            watchableClient,
+            newModelSpec,
+            watchMode,
+            watcherFilter,
+            unhandledErrorListener,
+            resultFilter,
+            isWatched
+        );
+    }
+
+    public static boolean isCompressed(Set<CreateOption> createOptions)
+    {
+        return createOptions.contains(CreateOption.compress);
+    }
+
+    @Override
+    public CuratorOp createOp(T model)
+    {
+        return client.transactionOp()
+            .create()
+            .withOptions(modelSpec.createMode(), fixAclList(modelSpec.aclList()), modelSpec.createOptions().contains(CreateOption.compress), modelSpec.ttl())
+            .forPath(resolveForSet(model), modelSpec.serializer().serialize(model));
+    }
+
+    @Override
+    public CuratorOp updateOp(T model)
+    {
+        return updateOp(model, -1);
+    }
+
+    @Override
+    public CuratorOp updateOp(T model, int version)
+    {
+        AsyncTransactionSetDataBuilder builder = client.transactionOp().setData();
+        if ( isCompressed() )
+        {
+            return builder.withVersionCompressed(version).forPath(resolveForSet(model), modelSpec.serializer().serialize(model));
+        }
+        return builder.withVersion(version).forPath(resolveForSet(model), modelSpec.serializer().serialize(model));
+    }
+
+    @Override
+    public CuratorOp deleteOp()
+    {
+        return deleteOp(-1);
+    }
+
+    @Override
+    public CuratorOp deleteOp(int version)
+    {
+        return client.transactionOp().delete().withVersion(version).forPath(modelSpec.path().fullPath());
+    }
+
+    @Override
+    public CuratorOp checkExistsOp()
+    {
+        return checkExistsOp(-1);
+    }
+
+    @Override
+    public CuratorOp checkExistsOp(int version)
+    {
+        return client.transactionOp().check().withVersion(version).forPath(modelSpec.path().fullPath());
+    }
+
+    @Override
+    public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations)
+    {
+        return client.transaction().forOperations(operations);
+    }
+
+    private boolean isCompressed()
+    {
+        return modelSpec.createOptions().contains(CreateOption.compress);
+    }
+
+    private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver, Stat storingStatIn)
+    {
+        Stat stat = (storingStatIn != null) ? storingStatIn : new Stat();
+        AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? watchableClient.getData().decompressedStoringStatIn(stat) : watchableClient.getData().storingStatIn(stat);
+        AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath());
+        ModelStage<U> modelStage = ModelStage.make(asyncStage.event());
+        asyncStage.whenComplete((value, e) -> {
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                try
+                {
+                    ZNode<T> node = new ZNodeImpl<>(modelSpec.path(), stat, modelSpec.serializer().deserialize(value));
+                    modelStage.complete(resolver.apply(node));
+                }
+                catch ( Exception deserializeException )
+                {
+                    modelStage.completeExceptionally(deserializeException);
+                }
+            }
+        });
+        return modelStage;
+    }
+
+    private String resolveForSet(T model)
+    {
+        if ( modelSpec.path().isResolved() )
+        {
+            return modelSpec.path().fullPath();
+        }
+        return modelSpec.path().resolved(model).fullPath();
+    }
+
+    private List<ACL> fixAclList(List<ACL> aclList)
+    {
+        return (aclList.size() > 0) ? aclList : null;   // workaround for old, bad design. empty list not accepted
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
new file mode 100644
index 0000000..89d7615
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.versioned.Versioned;
+import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.data.Stat;
+
+class VersionedModeledFrameworkImpl<T> implements VersionedModeledFramework<T>
+{
+    private final ModeledFramework<T> client;
+
+    VersionedModeledFrameworkImpl(ModeledFramework<T> client)
+    {
+        this.client = client;
+    }
+
+    @Override
+    public AsyncStage<String> set(Versioned<T> model)
+    {
+        return client.set(model.model(), model.version());
+    }
+
+    @Override
+    public AsyncStage<String> set(Versioned<T> model, Stat storingStatIn)
+    {
+        return client.set(model.model(), storingStatIn, model.version());
+    }
+
+    @Override
+    public AsyncStage<Versioned<T>> read()
+    {
+        return read(null);
+    }
+
+    @Override
+    public AsyncStage<Versioned<T>> read(Stat storingStatIn)
+    {
+        Stat localStat = (storingStatIn != null) ? storingStatIn : new Stat();
+        AsyncStage<T> stage = client.read(localStat);
+        ModelStage<Versioned<T>> modelStage = ModelStage.make(stage.event());
+        stage.whenComplete((model, e) -> {
+           if ( e != null )
+           {
+               modelStage.completeExceptionally(e);
+           }
+           else
+           {
+               modelStage.complete(Versioned.from(model, localStat.getVersion()));
+           }
+        });
+        return modelStage;
+    }
+
+    @Override
+    public AsyncStage<Stat> update(Versioned<T> model)
+    {
+        return client.update(model.model(), model.version());
+    }
+
+    @Override
+    public CuratorOp updateOp(Versioned<T> model)
+    {
+        return client.updateOp(model.model(), model.version());
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
new file mode 100644
index 0000000..85bedf4
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+import java.util.Objects;
+
+public class ZNodeImpl<T> implements ZNode<T>
+{
+    private final ZPath path;
+    private final Stat stat;
+    private final T model;
+
+    public ZNodeImpl(ZPath path, Stat stat, T model)
+    {
+        this.path = Objects.requireNonNull(path, "path cannot be null");
+        this.stat = Objects.requireNonNull(stat, "stat cannot be null");
+        this.model = Objects.requireNonNull(model, "model cannot be null");
+    }
+
+    @Override
+    public ZPath path()
+    {
+        return path;
+    }
+
+    @Override
+    public Stat stat()
+    {
+        return stat;
+    }
+
+    @Override
+    public T model()
+    {
+        return model;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
new file mode 100644
index 0000000..fff742e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import org.apache.curator.x.async.modeled.NodeName;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.common.PathUtils;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.utils.ZKPaths.PATH_SEPARATOR;
+
+public class ZPathImpl implements ZPath
+{
+    public static final ZPath root = new ZPathImpl(Collections.singletonList(PATH_SEPARATOR), null);
+
+    private final List<String> nodes;
+    private final boolean isResolved;
+    private volatile String fullPath = null;
+    private volatile ZPath parent = null;
+    private volatile Pattern schema = null;
+
+    public static ZPath parse(String fullPath, UnaryOperator<String> nameFilter)
+    {
+        return parseInternal(fullPath, nameFilter);
+    }
+
+    private static ZPathImpl parseInternal(String fullPath, UnaryOperator<String> nameFilter)
+    {
+        List<String> nodes = ImmutableList.<String>builder()
+            .add(PATH_SEPARATOR)
+            .addAll(
+                Splitter.on(PATH_SEPARATOR)
+                    .omitEmptyStrings()
+                    .splitToList(fullPath)
+                    .stream()
+                    .map(nameFilter)
+                    .collect(Collectors.toList())
+             )
+            .build();
+        nodes.forEach(ZPathImpl::validate);
+        return new ZPathImpl(nodes, null);
+    }
+
+    public static ZPath from(String[] names)
+    {
+        return from(null, Arrays.asList(names));
+    }
+
+    public static ZPath from(List<String> names)
+    {
+        return from(null, names);
+    }
+
+    public static ZPath from(ZPath base, String[] names)
+    {
+        return from(base, Arrays.asList(names));
+    }
+
+    public static ZPath from(ZPath base, List<String> names)
+    {
+        names = Objects.requireNonNull(names, "names cannot be null");
+        names.forEach(ZPathImpl::validate);
+        ImmutableList.Builder<String> builder = ImmutableList.builder();
+        if ( base != null )
+        {
+            if ( base instanceof ZPathImpl )
+            {
+                builder.addAll(((ZPathImpl)base).nodes);
+            }
+            else
+            {
+                builder.addAll(Splitter.on(PATH_SEPARATOR).omitEmptyStrings().splitToList(base.fullPath()));
+            }
+        }
+        else
+        {
+            builder.add(PATH_SEPARATOR);
+        }
+        List<String> nodes = builder.addAll(names).build();
+        return new ZPathImpl(nodes, null);
+    }
+
+    @Override
+    public ZPath child(Object child)
+    {
+        return new ZPathImpl(nodes, NodeName.nameFrom(child));
+    }
+
+    @Override
+    public ZPath parent()
+    {
+        checkRootAccess();
+        if ( parent == null )
+        {
+            parent = new ZPathImpl(nodes.subList(0, nodes.size() - 1), null);
+        }
+        return parent;
+    }
+
+    @Override
+    public boolean isRoot()
+    {
+        return nodes.size() == 1;
+    }
+
+    @Override
+    public boolean startsWith(ZPath path)
+    {
+        ZPathImpl rhs;
+        if ( path instanceof ZPathImpl )
+        {
+            rhs = (ZPathImpl)path;
+        }
+        else
+        {
+            rhs = parseInternal(path.fullPath(), s -> s);
+        }
+        return (nodes.size() >= rhs.nodes.size()) && nodes.subList(0, rhs.nodes.size()).equals(rhs.nodes);
+    }
+
+    @Override
+    public Pattern toSchemaPathPattern()
+    {
+        if ( schema == null )
+        {
+            schema = Pattern.compile(buildFullPath(s -> isParameter(s) ? ".*" : s));
+        }
+        return schema;
+    }
+
+    @Override
+    public String fullPath()
+    {
+        checkResolved();
+        if ( fullPath == null )
+        {
+            fullPath = buildFullPath(s -> s);
+        }
+        return fullPath;
+    }
+
+    @Override
+    public String nodeName()
+    {
+        return nodes.get(nodes.size() - 1);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( this == o )
+        {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() )
+        {
+            return false;
+        }
+
+        ZPathImpl zPaths = (ZPathImpl)o;
+
+        return nodes.equals(zPaths.nodes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return nodes.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return nodes.subList(1, nodes.size())
+            .stream().map(name -> isParameter(name) ? name.substring(1) : name)
+            .collect(Collectors.joining(PATH_SEPARATOR, PATH_SEPARATOR, ""));
+    }
+
+    @Override
+    public ZPath resolved(List<Object> parameters)
+    {
+        Iterator<Object> iterator = parameters.iterator();
+        List<String> nodeNames = nodes.stream()
+            .map(name -> {
+                if ( isParameter(name) && iterator.hasNext() )
+                {
+                    return NodeName.nameFrom(iterator.next());
+                }
+                return name;
+            })
+            .collect(Collectors.toList());
+        return new ZPathImpl(nodeNames, null);
+    }
+
+    @Override
+    public boolean isResolved()
+    {
+        return isResolved;
+    }
+
+    private static boolean isParameter(String name)
+    {
+        return (name.length() > 1) && name.startsWith(PATH_SEPARATOR);
+    }
+
+    private ZPathImpl(List<String> nodes, String child)
+    {
+        ImmutableList.Builder<String> builder = ImmutableList.<String>builder().addAll(nodes);
+        if ( child != null )
+        {
+            validate(child);
+            builder.add(child);
+        }
+        this.nodes = builder.build();
+        isResolved = this.nodes.stream().noneMatch(ZPathImpl::isParameter);
+    }
+
+    private void checkRootAccess()
+    {
+        if ( isRoot() )
+        {
+            throw new NoSuchElementException("The root has no parent");
+        }
+    }
+
+    private void checkResolved()
+    {
+        if ( !isResolved)
+        {
+            throw new IllegalStateException("This ZPath has not been resolved: " + toString());
+        }
+    }
+
+    private static void validate(String nodeName)
+    {
+        if ( isParameter(Objects.requireNonNull(nodeName, "nodeName cannot be null")) )
+        {
+            return;
+        }
+        if ( nodeName.equals(PATH_SEPARATOR) )
+        {
+            return;
+        }
+        PathUtils.validatePath(PATH_SEPARATOR + nodeName);
+    }
+
+    private String buildFullPath(UnaryOperator<String> filter)
+    {
+        boolean addSeparator = false;
+        StringBuilder str = new StringBuilder();
+        int size = nodes.size();
+        int parameterIndex = 0;
+        for ( int i = 0; i < size; ++i )
+        {
+            if ( i > 1 )
+            {
+                str.append(PATH_SEPARATOR);
+            }
+            str.append(filter.apply(nodes.get(i)));
+        }
+        return str.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java
new file mode 100644
index 0000000..3fa9831
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * <p>
+ *     Abstraction that allows the construction of ModelSpecs using strongly typed parameter replacements.
+ *     For example, given a ModelSpec with a path such as "/root/registry/people/{id}" where "id" should
+ *     be <code>PersonId</code>.
+ * </p>
+ *
+ * <p>
+ * <pre><code>
+ * // Step 1. Create a typed ZPath
+ * TypedZPath&lt;PersonId&gt; typedPath = TypedZPath.from("/root/registry/people/{id}");
+ *
+ * // Step 2. Create a ModelSpec builder (do not build at this point)
+ * ModelSpecBuilder&lt;Person&gt; builder = ModelSpec.builder(JacksonModelSerializer.build(Person.class))
+ *
+ * // Step 3. Create a typed ModelSpec using the typed ZPath and ModelSpec builder
+ * TypedModelSpec&lt;Person, PersonId&gt; typedModelSpec = TypedModelSpec.from(builder, path);
+ *
+ * // later on the TypedModelSpec can be resolved into a useable ModelSpec
+ * ModelSpec&lt;Person&gt; modelSpec = typedModelSpec.resolve(personId);
+ * </pre></code>
+ * </p>
+ */
+@FunctionalInterface
+public interface TypedModelSpec<M, P1>
+{
+    /**
+     * Resolve into a ZPath using the given parameter
+     *
+     * @param p1 the parameter
+     * @return ZPath
+     */
+    ModelSpec<M> resolved(P1 p1);
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and typed path. When
+     * {@link #resolved(Object)} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param path typed path
+     * @return new TypedModelSpec
+     */
+    static <M, P1> TypedModelSpec<M, P1> from(ModelSpecBuilder<M> builder, TypedZPath<P1> path)
+    {
+        return p1 -> builder.withPath(path.resolved(p1)).build();
+    }
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+     * is created from the given full path and When
+     * {@link #resolved(Object)} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param pathWithIds typed path
+     * @return new TypedModelSpec
+     */
+    static <M, P1> TypedModelSpec<M, P1> from(ModelSpecBuilder<M> builder, String pathWithIds)
+    {
+        TypedZPath<P1> zPath = TypedZPath.from(pathWithIds);
+        return p1 -> builder.withPath(zPath.resolved(p1)).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java
new file mode 100644
index 0000000..dee3506
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * Same as {@link TypedModelSpec}, but with 0 parameters
+ */
+@FunctionalInterface
+public interface TypedModelSpec0<M>
+{
+    ModelSpec<M> resolved();
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and typed path. When
+     * {@link #resolved()} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param path typed path
+     * @return new TypedModelSpec
+     */
+    static <M> TypedModelSpec0<M> from(ModelSpecBuilder<M> builder, TypedZPath0 path)
+    {
+        return () -> builder.withPath(path.resolved()).build();
+    }
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+     * is created from the given full path and When
+     * {@link #resolved()} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param pathWithIds typed path
+     * @return new TypedModelSpec
+     */
+    static <M> TypedModelSpec0<M> from(ModelSpecBuilder<M> builder, String pathWithIds)
+    {
+        TypedZPath0 zPath = TypedZPath0.from(pathWithIds);
+        return () -> builder.withPath(zPath.resolved()).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java
new file mode 100644
index 0000000..1b00d66
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * Same as {@link org.apache.curator.x.async.modeled.typed.TypedModelSpec}, but with 10 parameters
+ */
+@FunctionalInterface
+public interface TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10>
+{
+    ModelSpec<M> resolved(P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9, P10 p10);
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and typed path. When
+     * {@link #resolved(Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param path typed path
+     * @return new TypedModelSpec
+     */
+    static <M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> from(ModelSpecBuilder<M> builder, TypedZPath10<P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> path)
+    {
+        return (p1, p2, p3, p4, p5, p6, p7, p8, p9, p10) -> builder.withPath(path.resolved(p1, p2, p3, p4, p5, p6, p7, p8, p9, p10)).build();
+    }
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+     * is created from the given full path and When
+     * {@link #resolved(Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param pathWithIds typed path
+     * @return new TypedModelSpec
+     */
+    static <M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> from(ModelSpecBuilder<M> builder, String pathWithIds)
+    {
+        TypedZPath10<P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> zPath = TypedZPath10.from(pathWithIds);
+        return (p1, p2, p3, p4, p5, p6, p7, p8, p9, p10) -> builder.withPath(zPath.resolved(p1, p2, p3, p4, p5, p6, p7, p8, p9, p10)).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java
new file mode 100644
index 0000000..a56e139
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * Same as {@link org.apache.curator.x.async.modeled.typed.TypedModelSpec}, but with 2 parameters
+ */
+@FunctionalInterface
+public interface TypedModelSpec2<M, P1, P2>
+{
+    ModelSpec<M> resolved(P1 p1, P2 p2);
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and typed path. When
+     * {@link #resolved(Object, Object)} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param path typed path
+     * @return new TypedModelSpec
+     */
+    static <M, P1, P2> TypedModelSpec2<M, P1, P2> from(ModelSpecBuilder<M> builder, TypedZPath2<P1, P2> path)
+    {
+        return (p1, p2) -> builder.withPath(path.resolved(p1, p2)).build();
+    }
+
+    /**
+     * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+     * is created from the given full path and When
+     * {@link #resolved(Object, Object)} is called the actual model spec is generated with the
+     * resolved path
+     *
+     * @param builder model spec builder
+     * @param pathWithIds typed path
+     * @return new TypedModelSpec
+     */
+    static <M, P1, P2> TypedModelSpec2<M, P1, P2> from(ModelSpecBuilder<M> builder, String pathWithIds)
+    {
+        TypedZPath2<P1, P2> zPath = TypedZPath2.from(pathWithIds);
+        return (p1, p2) -> builder.withPath(zPath.resolved(p1, p2)).build();
+    }
+}