You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/08 03:32:36 UTC
[4/6] curator git commit: Squashed commit of the following:
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
new file mode 100644
index 0000000..8acbebb
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.cached;
+
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+import java.io.Closeable;
+import java.util.List;
+
+public interface CachedModeledFramework<T> extends ModeledFramework<T>, Closeable
+{
+ /**
+ * Return the cache instance
+ *
+ * @return cache
+ */
+ ModeledCache<T> cache();
+
+ /**
+ * Returns a view of this instance that uses the CachedModeledFramework's executor
+ * for all default async completion operations. i.e. when you use, for example,
+ * {@link java.util.concurrent.CompletionStage#handleAsync(java.util.function.BiFunction)}
+ * this instance's executor is used instead of <code>ForkJoinPool.commonPool()</code>.
+ *
+ * @return view
+ */
+ CachedModeledFramework<T> asyncDefault();
+
+ /**
+ * Start the internally created cache
+ */
+ void start();
+
+ /**
+ * Close/stop the internally created cache
+ */
+ @Override
+ void close();
+
+ /**
+ * Return the listener container so that you can add/remove listeners
+ *
+ * @return listener container
+ */
+ Listenable<ModeledCacheListener<T>> listenable();
+
+ /**
+ * Same as {@link org.apache.curator.x.async.modeled.ModeledFramework#childrenAsZNodes()}
+ * but always reads from cache - i.e. no additional queries to ZooKeeper are made
+ *
+ * @return AsyncStage stage
+ */
+ @Override
+ AsyncStage<List<ZNode<T>>> childrenAsZNodes();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ CachedModeledFramework<T> child(Object child);
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ CachedModeledFramework<T> withPath(ZPath path);
+
+ /**
+ * Same as {@link #read()} except that if the cache does not have a value
+ * for this path a direct query is made.
+ *
+ * @return AsyncStage
+ * @see org.apache.curator.x.async.AsyncStage
+ */
+ AsyncStage<T> readThrough();
+
+ /**
+ * Same as {@link #read(org.apache.zookeeper.data.Stat)} except that if the cache does not have a value
+ * for this path a direct query is made.
+ *
+ * @param storingStatIn the stat for the new ZNode is stored here
+ * @return AsyncStage
+ * @see org.apache.curator.x.async.AsyncStage
+ */
+ AsyncStage<T> readThrough(Stat storingStatIn);
+
+ /**
+ * Same as {@link #readAsZNode()} except that if the cache does not have a value
+ * for this path a direct query is made.
+ *
+ *
+ * @return AsyncStage
+ * @see org.apache.curator.x.async.AsyncStage
+ */
+ AsyncStage<ZNode<T>> readThroughAsZNode();
+
+ /**
+ * Return the instances of the base path of this cached framework
+ *
+ * @return listing of all models in the base path
+ */
+ AsyncStage<List<T>> list();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
new file mode 100644
index 0000000..6677268
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.cached;
+
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.Map;
+import java.util.Optional;
+
+public interface ModeledCache<T>
+{
+ /**
+ * Return the modeled current data for the given path. There are no guarantees of accuracy. This is
+ * merely the most recent view of the data. If there is no node at the given path,
+ * {@link java.util.Optional#empty()} is returned.
+ *
+ * @param path path to the node to check
+ * @return data if the node is alive, or empty
+ */
+ Optional<ZNode<T>> currentData(ZPath path);
+
+ /**
+ * Return the modeled current set of children at the given path, mapped by child name. There are no
+ * guarantees of accuracy; this is merely the most recent view of the data.
+ *
+ * @param path path to the node to check
+ * @return a possibly-empty map of children if the node is alive
+ */
+ Map<ZPath, ZNode<T>> currentChildren(ZPath path);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
new file mode 100644
index 0000000..42498c0
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.cached;
+
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.LoggerFactory;
+
+@FunctionalInterface
+public interface ModeledCacheListener<T>
+{
+ enum Type
+ {
+ /**
+ * A child was added to the path
+ */
+ NODE_ADDED,
+
+ /**
+ * A child's data was changed
+ */
+ NODE_UPDATED,
+
+ /**
+ * A child was removed from the path
+ */
+ NODE_REMOVED
+ }
+
+ /**
+ * The given path was added, updated or removed
+ *
+ * @param type action type
+ * @param path the path
+ * @param stat the node's stat (previous stat for removal)
+ * @param model the node's model (previous model for removal)
+ */
+ void accept(Type type, ZPath path, Stat stat, T model);
+
+ /**
+ * The cache has finished initializing
+ */
+ default void initialized()
+ {
+ // NOP
+ }
+
+ /**
+ * Called when there is an exception processing a message from the internal cache. This is most
+ * likely due to a de-serialization problem.
+ *
+ * @param e the exception
+ */
+ default void handleException(Exception e)
+ {
+ LoggerFactory.getLogger(getClass()).error("Could not process cache message", e);
+ }
+
+ /**
+ * Returns a version of this listener that only begins calling
+ * {@link #accept(org.apache.curator.x.async.modeled.cached.ModeledCacheListener.Type, org.apache.curator.x.async.modeled.ZPath, org.apache.zookeeper.data.Stat, Object)}
+ * once {@link #initialized()} has been called. i.e. changes that occur as the cache is initializing are not sent
+ * to the listener
+ *
+ * @return wrapped listener
+ */
+ default ModeledCacheListener<T> postInitializedOnly()
+ {
+ return new ModeledCacheListener<T>()
+ {
+ private volatile boolean isInitialized = false;
+
+ @Override
+ public void accept(Type type, ZPath path, Stat stat, T model)
+ {
+ if ( isInitialized )
+ {
+ ModeledCacheListener.this.accept(type, path, stat, model);
+ }
+ }
+
+ @Override
+ public void initialized()
+ {
+ isInitialized = true;
+ ModeledCacheListener.this.initialized();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
new file mode 100644
index 0000000..2a7fd5f
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.cached.ModeledCache;
+import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
+import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
+{
+ private final ModeledFramework<T> client;
+ private final ModeledCacheImpl<T> cache;
+ private final Executor executor;
+ private final boolean asyncDefaultMode;
+
+ CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
+ {
+ this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor, false);
+ }
+
+ private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor, boolean asyncDefaultMode)
+ {
+ this.client = client;
+ this.cache = cache;
+ this.executor = executor;
+ this.asyncDefaultMode = asyncDefaultMode;
+ }
+
+ @Override
+ public ModeledCache<T> cache()
+ {
+ return cache;
+ }
+
+ @Override
+ public CachedModeledFramework<T> asyncDefault()
+ {
+ return new CachedModeledFrameworkImpl<>(client, cache, executor, true);
+ }
+
+ @Override
+ public void start()
+ {
+ cache.start();
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ }
+
+ @Override
+ public Listenable<ModeledCacheListener<T>> listenable()
+ {
+ return cache.listenable();
+ }
+
+ @Override
+ public CachedModeledFramework<T> cached()
+ {
+ throw new UnsupportedOperationException("Already a cached instance");
+ }
+
+ @Override
+ public CachedModeledFramework<T> cached(ExecutorService executor)
+ {
+ throw new UnsupportedOperationException("Already a cached instance");
+ }
+
+ @Override
+ public VersionedModeledFramework<T> versioned()
+ {
+ return new VersionedModeledFrameworkImpl<>(this);
+ }
+
+ @Override
+ public AsyncCuratorFramework unwrap()
+ {
+ return client.unwrap();
+ }
+
+ @Override
+ public ModelSpec<T> modelSpec()
+ {
+ return client.modelSpec();
+ }
+
+ @Override
+ public CachedModeledFramework<T> child(Object child)
+ {
+ return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor, asyncDefaultMode);
+ }
+
+ @Override
+ public ModeledFramework<T> parent()
+ {
+ throw new UnsupportedOperationException("Not supported for CachedModeledFramework. Instead, call parent() on the ModeledFramework before calling cached()");
+ }
+
+ @Override
+ public CachedModeledFramework<T> withPath(ZPath path)
+ {
+ return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor, asyncDefaultMode);
+ }
+
+ @Override
+ public AsyncStage<String> set(T model)
+ {
+ return client.set(model);
+ }
+
+ @Override
+ public AsyncStage<String> set(T model, Stat storingStatIn)
+ {
+ return client.set(model, storingStatIn);
+ }
+
+ @Override
+ public AsyncStage<String> set(T model, Stat storingStatIn, int version)
+ {
+ return client.set(model, storingStatIn, version);
+ }
+
+ @Override
+ public AsyncStage<String> set(T model, int version)
+ {
+ return client.set(model, version);
+ }
+
+ @Override
+ public AsyncStage<T> read()
+ {
+ return internalRead(ZNode::model, this::exceptionally);
+ }
+
+ @Override
+ public AsyncStage<T> read(Stat storingStatIn)
+ {
+ return internalRead(n -> {
+ if ( storingStatIn != null )
+ {
+ DataTree.copyStat(n.stat(), storingStatIn);
+ }
+ return n.model();
+ }, this::exceptionally);
+ }
+
+ @Override
+ public AsyncStage<ZNode<T>> readAsZNode()
+ {
+ return internalRead(Function.identity(), this::exceptionally);
+ }
+
+ @Override
+ public AsyncStage<T> readThrough()
+ {
+ return internalRead(ZNode::model, client::read);
+ }
+
+ @Override
+ public AsyncStage<T> readThrough(Stat storingStatIn)
+ {
+ return internalRead(ZNode::model, () -> client.read(storingStatIn));
+ }
+
+ @Override
+ public AsyncStage<ZNode<T>> readThroughAsZNode()
+ {
+ return internalRead(Function.identity(), client::readAsZNode);
+ }
+
+ @Override
+ public AsyncStage<List<T>> list()
+ {
+ List<T> children = cache.currentChildren()
+ .values()
+ .stream()
+ .map(ZNode::model)
+ .collect(Collectors.toList());
+ return asyncDefaultMode ? ModelStage.asyncCompleted(children, executor) : ModelStage.completed(children);
+ }
+
+ @Override
+ public AsyncStage<Stat> update(T model)
+ {
+ return client.update(model);
+ }
+
+ @Override
+ public AsyncStage<Stat> update(T model, int version)
+ {
+ return client.update(model, version);
+ }
+
+ @Override
+ public AsyncStage<Void> delete()
+ {
+ return client.delete();
+ }
+
+ @Override
+ public AsyncStage<Void> delete(int version)
+ {
+ return client.delete(version);
+ }
+
+ @Override
+ public AsyncStage<Stat> checkExists()
+ {
+ ZPath path = client.modelSpec().path();
+ Optional<ZNode<T>> data = cache.currentData(path);
+ return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null));
+ }
+
+ @Override
+ public AsyncStage<List<ZPath>> children()
+ {
+ List<ZPath> paths = cache.currentChildren(client.modelSpec().path())
+ .keySet()
+ .stream()
+ .filter(path -> path.equals(cache.basePath()))
+ .collect(Collectors.toList());
+ return completed(paths);
+ }
+
+ @Override
+ public AsyncStage<List<ZNode<T>>> childrenAsZNodes()
+ {
+ List<ZNode<T>> nodes = cache.currentChildren(client.modelSpec().path())
+ .entrySet()
+ .stream()
+ .filter(e -> e.getKey().equals(cache.basePath()))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ return completed(nodes);
+ }
+
+ @Override
+ public CuratorOp createOp(T model)
+ {
+ return client.createOp(model);
+ }
+
+ @Override
+ public CuratorOp updateOp(T model)
+ {
+ return client.updateOp(model);
+ }
+
+ @Override
+ public CuratorOp updateOp(T model, int version)
+ {
+ return client.updateOp(model, version);
+ }
+
+ @Override
+ public CuratorOp deleteOp()
+ {
+ return client.deleteOp();
+ }
+
+ @Override
+ public CuratorOp deleteOp(int version)
+ {
+ return client.deleteOp(version);
+ }
+
+ @Override
+ public CuratorOp checkExistsOp()
+ {
+ return client.checkExistsOp();
+ }
+
+ @Override
+ public CuratorOp checkExistsOp(int version)
+ {
+ return client.checkExistsOp(version);
+ }
+
+ @Override
+ public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations)
+ {
+ return client.inTransaction(operations);
+ }
+
+ private <U> AsyncStage<U> completed(U value)
+ {
+ return asyncDefaultMode ? ModelStage.asyncCompleted(value, executor) : ModelStage.completed(value);
+ }
+
+ private <U> AsyncStage<U> exceptionally()
+ {
+ KeeperException.NoNodeException exception = new KeeperException.NoNodeException(client.modelSpec().path().fullPath());
+ return asyncDefaultMode ? ModelStage.asyncExceptionally(exception, executor) : ModelStage.exceptionally(exception);
+ }
+
+ private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, Supplier<AsyncStage<U>> elseProc)
+ {
+ ZPath path = client.modelSpec().path();
+ Optional<ZNode<T>> data = cache.currentData(path);
+ return data.map(node -> completed(resolver.apply(node)))
+ .orElseGet(elseProc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java
new file mode 100644
index 0000000..58405eb
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.schema.Schema;
+import org.apache.curator.framework.schema.SchemaValidator;
+import org.apache.curator.framework.schema.SchemaViolation;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.api.DeleteOption;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator
+{
+ private final ZPath path;
+ private final ModelSerializer<T> serializer;
+ private final CreateMode createMode;
+ private final List<ACL> aclList;
+ private final Set<CreateOption> createOptions;
+ private final Set<DeleteOption> deleteOptions;
+ private final long ttl;
+ private volatile Schema schema = null;
+
+ public ModelSpecImpl(ZPath path, ModelSerializer<T> serializer, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions, long ttl)
+ {
+ this.path = Objects.requireNonNull(path, "path cannot be null");
+ this.serializer = Objects.requireNonNull(serializer, "serializer cannot be null");
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ this.aclList = ImmutableList.copyOf(Objects.requireNonNull(aclList, "aclList cannot be null"));
+ this.createOptions = ImmutableSet.copyOf(Objects.requireNonNull(createOptions, "createOptions cannot be null"));
+ this.deleteOptions = ImmutableSet.copyOf(Objects.requireNonNull(deleteOptions, "deleteOptions cannot be null"));
+ this.ttl = ttl;
+ }
+
+ @Override
+ public ModelSpec<T> child(Object child)
+ {
+ return withPath(path.child(child));
+ }
+
+ @Override
+ public ModelSpec<T> parent()
+ {
+ return withPath(path.parent());
+ }
+
+ @Override
+ public ModelSpec<T> resolved(Object... parameters)
+ {
+ return withPath(path.resolved(parameters));
+ }
+
+ @Override
+ public ModelSpec<T> resolved(List<Object> parameters)
+ {
+ return withPath(path.resolved(parameters));
+ }
+
+ @Override
+ public ModelSpec<T> withPath(ZPath newPath)
+ {
+ return new ModelSpecImpl<>(newPath, serializer, createMode, aclList, createOptions, deleteOptions, ttl);
+ }
+
+ @Override
+ public ZPath path()
+ {
+ return path;
+ }
+
+ @Override
+ public ModelSerializer<T> serializer()
+ {
+ return serializer;
+ }
+
+ @Override
+ public CreateMode createMode()
+ {
+ return createMode;
+ }
+
+ @Override
+ public List<ACL> aclList()
+ {
+ return aclList;
+ }
+
+ @Override
+ public Set<CreateOption> createOptions()
+ {
+ return createOptions;
+ }
+
+ @Override
+ public Set<DeleteOption> deleteOptions()
+ {
+ return deleteOptions;
+ }
+
+ @Override
+ public long ttl()
+ {
+ return ttl;
+ }
+
+ @Override
+ public Schema schema()
+ {
+ if ( schema == null )
+ {
+ schema = Schema.builder(path.toSchemaPathPattern())
+ .dataValidator(this)
+ .ephemeral(createMode.isEphemeral() ? Schema.Allowance.MUST : Schema.Allowance.CANNOT)
+ .canBeDeleted(true)
+ .sequential(createMode.isSequential() ? Schema.Allowance.MUST : Schema.Allowance.CANNOT)
+ .watched(Schema.Allowance.CAN)
+ .build();
+ }
+ return schema;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ ModelSpecImpl<?> modelSpec = (ModelSpecImpl<?>)o;
+
+ if ( ttl != modelSpec.ttl )
+ {
+ return false;
+ }
+ if ( !path.equals(modelSpec.path) )
+ {
+ return false;
+ }
+ if ( !serializer.equals(modelSpec.serializer) )
+ {
+ return false;
+ }
+ if ( createMode != modelSpec.createMode )
+ {
+ return false;
+ }
+ if ( !aclList.equals(modelSpec.aclList) )
+ {
+ return false;
+ }
+ if ( !createOptions.equals(modelSpec.createOptions) )
+ {
+ return false;
+ }
+ //noinspection SimplifiableIfStatement
+ if ( !deleteOptions.equals(modelSpec.deleteOptions) )
+ {
+ return false;
+ }
+ return schema.equals(modelSpec.schema);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = path.hashCode();
+ result = 31 * result + serializer.hashCode();
+ result = 31 * result + createMode.hashCode();
+ result = 31 * result + aclList.hashCode();
+ result = 31 * result + createOptions.hashCode();
+ result = 31 * result + deleteOptions.hashCode();
+ result = 31 * result + (int)(ttl ^ (ttl >>> 32));
+ result = 31 * result + schema.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ModelSpecImpl{" + "path=" + path + ", serializer=" + serializer + ", createMode=" + createMode + ", aclList=" + aclList + ", createOptions=" + createOptions + ", deleteOptions=" + deleteOptions + ", ttl=" + ttl + ", schema=" + schema + '}';
+ }
+
+ @Override
+ public boolean isValid(Schema schema, String path, byte[] data, List<ACL> acl)
+ {
+ if ( acl != null )
+ {
+ List<ACL> localAclList = (aclList.size() > 0) ? aclList : ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ if ( !acl.equals(localAclList) )
+ {
+ throw new SchemaViolation(schema, new SchemaViolation.ViolatorData(path, data, acl), "ACLs do not match model ACLs");
+ }
+ }
+
+ if ( data != null )
+ {
+ try
+ {
+ serializer.deserialize(data);
+ }
+ catch ( RuntimeException e )
+ {
+ throw new SchemaViolation(schema, new SchemaViolation.ViolatorData(path, data, acl), "Data cannot be deserialized into a model");
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
new file mode 100644
index 0000000..27047ec
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T>
+{
+ private final CompletionStage<WatchedEvent> event;
+
+ static <U> ModelStage<U> make()
+ {
+ return new ModelStage<>(null);
+ }
+
+ static <U> ModelStage<U> make(CompletionStage<WatchedEvent> event)
+ {
+ return new ModelStage<>(event);
+ }
+
+ static <U> ModelStage<U> completed(U value)
+ {
+ ModelStage<U> stage = new ModelStage<>(null);
+ stage.complete(value);
+ return stage;
+ }
+
+ static <U> ModelStage<U> exceptionally(Exception e)
+ {
+ ModelStage<U> stage = new ModelStage<>(null);
+ stage.completeExceptionally(e);
+ return stage;
+ }
+
+ static <U> ModelStage<U> async(Executor executor)
+ {
+ return new AsyncModelStage<>(executor);
+ }
+
+ static <U> ModelStage<U> asyncCompleted(U value, Executor executor)
+ {
+ ModelStage<U> stage = new AsyncModelStage<>(executor);
+ stage.complete(value);
+ return stage;
+ }
+
+ static <U> ModelStage<U> asyncExceptionally(Exception e, Executor executor)
+ {
+ ModelStage<U> stage = new AsyncModelStage<>(executor);
+ stage.completeExceptionally(e);
+ return stage;
+ }
+
+ @Override
+ public CompletionStage<WatchedEvent> event()
+ {
+ return event;
+ }
+
+ private ModelStage(CompletionStage<WatchedEvent> event)
+ {
+ this.event = event;
+ }
+
+ private static class AsyncModelStage<U> extends ModelStage<U>
+ {
+ private final Executor executor;
+
+ public AsyncModelStage(Executor executor)
+ {
+ super(null);
+ this.executor = executor;
+ }
+
+ @Override
+ public <U1> CompletableFuture<U1> thenApplyAsync(Function<? super U, ? extends U1> fn)
+ {
+ return super.thenApplyAsync(fn, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> thenAcceptAsync(Consumer<? super U> action)
+ {
+ return super.thenAcceptAsync(action, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> thenRunAsync(Runnable action)
+ {
+ return super.thenRunAsync(action, executor);
+ }
+
+ @Override
+ public <U1, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U1> other, BiFunction<? super U, ? super U1, ? extends V> fn)
+ {
+ return super.thenCombineAsync(other, fn, executor);
+ }
+
+ @Override
+ public <U1> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U1> other, BiConsumer<? super U, ? super U1> action)
+ {
+ return super.thenAcceptBothAsync(other, action, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
+ {
+ return super.runAfterBothAsync(other, action, executor);
+ }
+
+ @Override
+ public <U1> CompletableFuture<U1> applyToEitherAsync(CompletionStage<? extends U> other, Function<? super U, U1> fn)
+ {
+ return super.applyToEitherAsync(other, fn, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends U> other, Consumer<? super U> action)
+ {
+ return super.acceptEitherAsync(other, action, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
+ {
+ return super.runAfterEitherAsync(other, action, executor);
+ }
+
+ @Override
+ public <U1> CompletableFuture<U1> thenComposeAsync(Function<? super U, ? extends CompletionStage<U1>> fn)
+ {
+ return super.thenComposeAsync(fn, executor);
+ }
+
+ @Override
+ public CompletableFuture<U> whenCompleteAsync(BiConsumer<? super U, ? super Throwable> action)
+ {
+ return super.whenCompleteAsync(action, executor);
+ }
+
+ @Override
+ public <U1> CompletableFuture<U1> handleAsync(BiFunction<? super U, Throwable, ? extends U1> fn)
+ {
+ return super.handleAsync(fn, executor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
new file mode 100644
index 0000000..72e6762
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.cached.ModeledCache;
+import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.zookeeper.data.Stat;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
+{
+ private final TreeCache cache;
+ private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
+ private final ModelSerializer<T> serializer;
+ private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
+ private final ZPath basePath;
+
+ private static final class Entry<T>
+ {
+ final Stat stat;
+ final T model;
+
+ Entry(Stat stat, T model)
+ {
+ this.stat = stat;
+ this.model = model;
+ }
+ }
+
+ ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
+ {
+ if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() )
+ {
+ modelSpec = modelSpec.parent(); // i.e. the last item is a parameter
+ }
+
+ basePath = modelSpec.path();
+ this.serializer = modelSpec.serializer();
+ cache = TreeCache.newBuilder(client, basePath.fullPath())
+ .setCacheData(false)
+ .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
+ .setExecutor(executor)
+ .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
+ .build();
+ }
+
+ public void start()
+ {
+ try
+ {
+ cache.getListenable().addListener(this);
+ cache.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void close()
+ {
+ cache.getListenable().removeListener(this);
+ cache.close();
+ entries.clear();
+ }
+
+ @Override
+ public Optional<ZNode<T>> currentData(ZPath path)
+ {
+ Entry<T> entry = entries.remove(path);
+ if ( entry != null )
+ {
+ return Optional.of(new ZNodeImpl<>(path, entry.stat, entry.model));
+ }
+ return Optional.empty();
+ }
+
+ ZPath basePath()
+ {
+ return basePath;
+ }
+
+ Map<ZPath, ZNode<T>> currentChildren()
+ {
+ return currentChildren(basePath);
+ }
+
+ @Override
+ public Map<ZPath, ZNode<T>> currentChildren(ZPath path)
+ {
+ return entries.entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(path))
+ .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new ZNodeImpl<>(entry.getKey(), entry.getValue().stat, entry.getValue().model)))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ public Listenable<ModeledCacheListener<T>> listenable()
+ {
+ return listenerContainer;
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event)
+ {
+ try
+ {
+ internalChildEvent(event);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+
+ listenerContainer.forEach(l -> {
+ l.handleException(e);
+ return null;
+ });
+ }
+ }
+
+ private void internalChildEvent(TreeCacheEvent event) throws Exception
+ {
+ switch ( event.getType() )
+ {
+ case NODE_ADDED:
+ case NODE_UPDATED:
+ {
+ ZPath path = ZPath.parse(event.getData().getPath());
+ if ( !path.equals(basePath) )
+ {
+ byte[] bytes = event.getData().getData();
+ if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created
+ {
+ T model = serializer.deserialize(bytes);
+ entries.put(path, new Entry<>(event.getData().getStat(), model));
+ ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+ accept(type, path, event.getData().getStat(), model);
+ }
+ }
+ break;
+ }
+
+ case NODE_REMOVED:
+ {
+ ZPath path = ZPath.parse(event.getData().getPath());
+ if ( !path.equals(basePath) )
+ {
+ Entry<T> entry = entries.remove(path);
+ T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
+ Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+ accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
+ }
+ break;
+ }
+
+ case INITIALIZED:
+ {
+ listenerContainer.forEach(l -> {
+ l.initialized();
+ return null;
+ });
+ break;
+ }
+
+ default:
+ // ignore
+ break;
+ }
+ }
+
+ private void accept(ModeledCacheListener.Type type, ZPath path, Stat stat, T model)
+ {
+ listenerContainer.forEach(l -> {
+ l.accept(type, path, stat, model);
+ return null;
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
new file mode 100644
index 0000000..c1d19c4
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.WatchMode;
+import org.apache.curator.x.async.api.AsyncCuratorFrameworkDsl;
+import org.apache.curator.x.async.api.AsyncPathAndBytesable;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncTransactionSetDataBuilder;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
+{
+ private final AsyncCuratorFramework client;
+ private final WatchableAsyncCuratorFramework watchableClient;
+ private final ModelSpec<T> modelSpec;
+ private final WatchMode watchMode;
+ private final UnaryOperator<WatchedEvent> watcherFilter;
+ private final UnhandledErrorListener unhandledErrorListener;
+ private final UnaryOperator<CuratorEvent> resultFilter;
+ private final AsyncCuratorFrameworkDsl dslClient;
+ private final boolean isWatched;
+
+ public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter)
+ {
+ boolean isWatched = (watchMode != null);
+
+ Objects.requireNonNull(client, "client cannot be null");
+ Objects.requireNonNull(model, "model cannot be null");
+
+ watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess;
+
+ AsyncCuratorFrameworkDsl dslClient = client.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter);
+ WatchableAsyncCuratorFramework watchableClient = isWatched ? dslClient.watched() : dslClient;
+
+ return new ModeledFrameworkImpl<>(
+ client,
+ dslClient,
+ watchableClient,
+ model,
+ watchMode,
+ watcherFilter,
+ unhandledErrorListener,
+ resultFilter,
+ isWatched
+ );
+ }
+
+ private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched)
+ {
+ this.client = client;
+ this.dslClient = dslClient;
+ this.watchableClient = watchableClient;
+ this.modelSpec = modelSpec;
+ this.watchMode = watchMode;
+ this.watcherFilter = watcherFilter;
+ this.unhandledErrorListener = unhandledErrorListener;
+ this.resultFilter = resultFilter;
+ this.isWatched = isWatched;
+ }
+
+ @Override
+ public CachedModeledFramework<T> cached()
+ {
+ return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+ }
+
+ @Override
+ public CachedModeledFramework<T> cached(ExecutorService executor)
+ {
+ Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers.");
+ return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null"));
+ }
+
+ @Override
+ public VersionedModeledFramework<T> versioned()
+ {
+ return new VersionedModeledFrameworkImpl<>(this);
+ }
+
+ @Override
+ public ModelSpec<T> modelSpec()
+ {
+ return modelSpec;
+ }
+
+ @Override
+ public AsyncCuratorFramework unwrap()
+ {
+ return client;
+ }
+
+ @Override
+ public AsyncStage<String> set(T item)
+ {
+ return set(item, null, -1);
+ }
+
+ @Override
+ public AsyncStage<String> set(T item, Stat storingStatIn)
+ {
+ return set(item, storingStatIn, -1);
+ }
+
+ @Override
+ public AsyncStage<String> set(T item, int version)
+ {
+ return set(item, null, -1);
+ }
+
+ @Override
+ public AsyncStage<String> set(T item, Stat storingStatIn, int version)
+ {
+ try
+ {
+ byte[] bytes = modelSpec.serializer().serialize(item);
+ return dslClient.create()
+ .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl(), version)
+ .forPath(resolveForSet(item), bytes);
+ }
+ catch ( Exception e )
+ {
+ return ModelStage.exceptionally(e);
+ }
+ }
+
+ @Override
+ public AsyncStage<T> read()
+ {
+ return internalRead(ZNode::model, null);
+ }
+
+ @Override
+ public AsyncStage<T> read(Stat storingStatIn)
+ {
+ return internalRead(ZNode::model, storingStatIn);
+ }
+
+ @Override
+ public AsyncStage<ZNode<T>> readAsZNode()
+ {
+ return internalRead(Function.identity(), null);
+ }
+
+ @Override
+ public AsyncStage<Stat> update(T item)
+ {
+ return update(item, -1);
+ }
+
+ @Override
+ public AsyncStage<Stat> update(T item, int version)
+ {
+ try
+ {
+ byte[] bytes = modelSpec.serializer().serialize(item);
+ AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData();
+ return next.forPath(resolveForSet(item), bytes);
+ }
+ catch ( Exception e )
+ {
+ return ModelStage.exceptionally(e);
+ }
+ }
+
+ @Override
+ public AsyncStage<Stat> checkExists()
+ {
+ return watchableClient.checkExists().forPath(modelSpec.path().fullPath());
+ }
+
+ @Override
+ public AsyncStage<Void> delete()
+ {
+ return delete(-1);
+ }
+
+ @Override
+ public AsyncStage<Void> delete(int version)
+ {
+ return dslClient.delete().withVersion(-1).forPath(modelSpec.path().fullPath());
+ }
+
+ @Override
+ public AsyncStage<List<ZPath>> children()
+ {
+ return internalGetChildren(modelSpec.path());
+ }
+
+ @Override
+ public AsyncStage<List<ZNode<T>>> childrenAsZNodes()
+ {
+ ModelStage<List<ZNode<T>>> modelStage = ModelStage.make();
+ Preconditions.checkState(!isWatched, "childrenAsZNodes() cannot be used with watched instances.");
+ children().handle((children, e) -> {
+ if ( e != null )
+ {
+ modelStage.completeExceptionally(e);
+ }
+ else
+ {
+ completeChildrenAsZNodes(modelStage, children);
+ }
+ return null;
+ });
+ return modelStage;
+ }
+
+ private void completeChildrenAsZNodes(ModelStage<List<ZNode<T>>> modelStage, List<ZPath> children)
+ {
+ List<ZNode<T>> nodes = Lists.newArrayList();
+ if ( children.size() == 0 )
+ {
+ modelStage.complete(nodes);
+ return;
+ }
+ children.forEach(path -> withPath(path).readAsZNode().handle((node, e) -> {
+ if ( e != null )
+ {
+ modelStage.completeExceptionally(e);
+ }
+ else
+ {
+ nodes.add(node);
+ if ( nodes.size() == children.size() )
+ {
+ modelStage.complete(nodes);
+ }
+ }
+ return null;
+ }));
+ }
+
+ private AsyncStage<List<ZPath>> internalGetChildren(ZPath path)
+ {
+ AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(path.fullPath());
+ ModelStage<List<ZPath>> modelStage = ModelStage.make(asyncStage.event());
+ asyncStage.whenComplete((children, e) -> {
+ if ( e != null )
+ {
+ modelStage.completeExceptionally(e);
+ }
+ else
+ {
+ modelStage.complete(children.stream().map(path::child).collect(Collectors.toList()));
+ }
+ });
+ return modelStage;
+ }
+
+ @Override
+ public ModeledFramework<T> parent()
+ {
+ ModelSpec<T> newModelSpec = modelSpec.parent();
+ return new ModeledFrameworkImpl<>(
+ client,
+ dslClient,
+ watchableClient,
+ newModelSpec,
+ watchMode,
+ watcherFilter,
+ unhandledErrorListener,
+ resultFilter,
+ isWatched
+ );
+ }
+
+ @Override
+ public ModeledFramework<T> child(Object child)
+ {
+ ModelSpec<T> newModelSpec = modelSpec.child(child);
+ return new ModeledFrameworkImpl<>(
+ client,
+ dslClient,
+ watchableClient,
+ newModelSpec,
+ watchMode,
+ watcherFilter,
+ unhandledErrorListener,
+ resultFilter,
+ isWatched
+ );
+ }
+
+ @Override
+ public ModeledFramework<T> withPath(ZPath path)
+ {
+ ModelSpec<T> newModelSpec = modelSpec.withPath(path);
+ return new ModeledFrameworkImpl<>(
+ client,
+ dslClient,
+ watchableClient,
+ newModelSpec,
+ watchMode,
+ watcherFilter,
+ unhandledErrorListener,
+ resultFilter,
+ isWatched
+ );
+ }
+
+ public static boolean isCompressed(Set<CreateOption> createOptions)
+ {
+ return createOptions.contains(CreateOption.compress);
+ }
+
+ @Override
+ public CuratorOp createOp(T model)
+ {
+ return client.transactionOp()
+ .create()
+ .withOptions(modelSpec.createMode(), fixAclList(modelSpec.aclList()), modelSpec.createOptions().contains(CreateOption.compress), modelSpec.ttl())
+ .forPath(resolveForSet(model), modelSpec.serializer().serialize(model));
+ }
+
+ @Override
+ public CuratorOp updateOp(T model)
+ {
+ return updateOp(model, -1);
+ }
+
+ @Override
+ public CuratorOp updateOp(T model, int version)
+ {
+ AsyncTransactionSetDataBuilder builder = client.transactionOp().setData();
+ if ( isCompressed() )
+ {
+ return builder.withVersionCompressed(version).forPath(resolveForSet(model), modelSpec.serializer().serialize(model));
+ }
+ return builder.withVersion(version).forPath(resolveForSet(model), modelSpec.serializer().serialize(model));
+ }
+
+ @Override
+ public CuratorOp deleteOp()
+ {
+ return deleteOp(-1);
+ }
+
+ @Override
+ public CuratorOp deleteOp(int version)
+ {
+ return client.transactionOp().delete().withVersion(version).forPath(modelSpec.path().fullPath());
+ }
+
+ @Override
+ public CuratorOp checkExistsOp()
+ {
+ return checkExistsOp(-1);
+ }
+
+ @Override
+ public CuratorOp checkExistsOp(int version)
+ {
+ return client.transactionOp().check().withVersion(version).forPath(modelSpec.path().fullPath());
+ }
+
+ @Override
+ public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations)
+ {
+ return client.transaction().forOperations(operations);
+ }
+
+ private boolean isCompressed()
+ {
+ return modelSpec.createOptions().contains(CreateOption.compress);
+ }
+
+ private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver, Stat storingStatIn)
+ {
+ Stat stat = (storingStatIn != null) ? storingStatIn : new Stat();
+ AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? watchableClient.getData().decompressedStoringStatIn(stat) : watchableClient.getData().storingStatIn(stat);
+ AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath());
+ ModelStage<U> modelStage = ModelStage.make(asyncStage.event());
+ asyncStage.whenComplete((value, e) -> {
+ if ( e != null )
+ {
+ modelStage.completeExceptionally(e);
+ }
+ else
+ {
+ try
+ {
+ ZNode<T> node = new ZNodeImpl<>(modelSpec.path(), stat, modelSpec.serializer().deserialize(value));
+ modelStage.complete(resolver.apply(node));
+ }
+ catch ( Exception deserializeException )
+ {
+ modelStage.completeExceptionally(deserializeException);
+ }
+ }
+ });
+ return modelStage;
+ }
+
+ private String resolveForSet(T model)
+ {
+ if ( modelSpec.path().isResolved() )
+ {
+ return modelSpec.path().fullPath();
+ }
+ return modelSpec.path().resolved(model).fullPath();
+ }
+
+ private List<ACL> fixAclList(List<ACL> aclList)
+ {
+ return (aclList.size() > 0) ? aclList : null; // workaround for old, bad design. empty list not accepted
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
new file mode 100644
index 0000000..89d7615
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.versioned.Versioned;
+import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.data.Stat;
+
+class VersionedModeledFrameworkImpl<T> implements VersionedModeledFramework<T>
+{
+ private final ModeledFramework<T> client;
+
+ VersionedModeledFrameworkImpl(ModeledFramework<T> client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public AsyncStage<String> set(Versioned<T> model)
+ {
+ return client.set(model.model(), model.version());
+ }
+
+ @Override
+ public AsyncStage<String> set(Versioned<T> model, Stat storingStatIn)
+ {
+ return client.set(model.model(), storingStatIn, model.version());
+ }
+
+ @Override
+ public AsyncStage<Versioned<T>> read()
+ {
+ return read(null);
+ }
+
+ @Override
+ public AsyncStage<Versioned<T>> read(Stat storingStatIn)
+ {
+ Stat localStat = (storingStatIn != null) ? storingStatIn : new Stat();
+ AsyncStage<T> stage = client.read(localStat);
+ ModelStage<Versioned<T>> modelStage = ModelStage.make(stage.event());
+ stage.whenComplete((model, e) -> {
+ if ( e != null )
+ {
+ modelStage.completeExceptionally(e);
+ }
+ else
+ {
+ modelStage.complete(Versioned.from(model, localStat.getVersion()));
+ }
+ });
+ return modelStage;
+ }
+
+ @Override
+ public AsyncStage<Stat> update(Versioned<T> model)
+ {
+ return client.update(model.model(), model.version());
+ }
+
+ @Override
+ public CuratorOp updateOp(Versioned<T> model)
+ {
+ return client.updateOp(model.model(), model.version());
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
new file mode 100644
index 0000000..85bedf4
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+import java.util.Objects;
+
+public class ZNodeImpl<T> implements ZNode<T>
+{
+ private final ZPath path;
+ private final Stat stat;
+ private final T model;
+
+ public ZNodeImpl(ZPath path, Stat stat, T model)
+ {
+ this.path = Objects.requireNonNull(path, "path cannot be null");
+ this.stat = Objects.requireNonNull(stat, "stat cannot be null");
+ this.model = Objects.requireNonNull(model, "model cannot be null");
+ }
+
+ @Override
+ public ZPath path()
+ {
+ return path;
+ }
+
+ @Override
+ public Stat stat()
+ {
+ return stat;
+ }
+
+ @Override
+ public T model()
+ {
+ return model;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
new file mode 100644
index 0000000..fff742e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import org.apache.curator.x.async.modeled.NodeName;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.common.PathUtils;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.utils.ZKPaths.PATH_SEPARATOR;
+
+public class ZPathImpl implements ZPath
+{
+ public static final ZPath root = new ZPathImpl(Collections.singletonList(PATH_SEPARATOR), null);
+
+ private final List<String> nodes;
+ private final boolean isResolved;
+ private volatile String fullPath = null;
+ private volatile ZPath parent = null;
+ private volatile Pattern schema = null;
+
+ public static ZPath parse(String fullPath, UnaryOperator<String> nameFilter)
+ {
+ return parseInternal(fullPath, nameFilter);
+ }
+
+ private static ZPathImpl parseInternal(String fullPath, UnaryOperator<String> nameFilter)
+ {
+ List<String> nodes = ImmutableList.<String>builder()
+ .add(PATH_SEPARATOR)
+ .addAll(
+ Splitter.on(PATH_SEPARATOR)
+ .omitEmptyStrings()
+ .splitToList(fullPath)
+ .stream()
+ .map(nameFilter)
+ .collect(Collectors.toList())
+ )
+ .build();
+ nodes.forEach(ZPathImpl::validate);
+ return new ZPathImpl(nodes, null);
+ }
+
+ public static ZPath from(String[] names)
+ {
+ return from(null, Arrays.asList(names));
+ }
+
+ public static ZPath from(List<String> names)
+ {
+ return from(null, names);
+ }
+
+ public static ZPath from(ZPath base, String[] names)
+ {
+ return from(base, Arrays.asList(names));
+ }
+
+ public static ZPath from(ZPath base, List<String> names)
+ {
+ names = Objects.requireNonNull(names, "names cannot be null");
+ names.forEach(ZPathImpl::validate);
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ if ( base != null )
+ {
+ if ( base instanceof ZPathImpl )
+ {
+ builder.addAll(((ZPathImpl)base).nodes);
+ }
+ else
+ {
+ builder.addAll(Splitter.on(PATH_SEPARATOR).omitEmptyStrings().splitToList(base.fullPath()));
+ }
+ }
+ else
+ {
+ builder.add(PATH_SEPARATOR);
+ }
+ List<String> nodes = builder.addAll(names).build();
+ return new ZPathImpl(nodes, null);
+ }
+
+ @Override
+ public ZPath child(Object child)
+ {
+ return new ZPathImpl(nodes, NodeName.nameFrom(child));
+ }
+
+ @Override
+ public ZPath parent()
+ {
+ checkRootAccess();
+ if ( parent == null )
+ {
+ parent = new ZPathImpl(nodes.subList(0, nodes.size() - 1), null);
+ }
+ return parent;
+ }
+
+ @Override
+ public boolean isRoot()
+ {
+ return nodes.size() == 1;
+ }
+
+ @Override
+ public boolean startsWith(ZPath path)
+ {
+ ZPathImpl rhs;
+ if ( path instanceof ZPathImpl )
+ {
+ rhs = (ZPathImpl)path;
+ }
+ else
+ {
+ rhs = parseInternal(path.fullPath(), s -> s);
+ }
+ return (nodes.size() >= rhs.nodes.size()) && nodes.subList(0, rhs.nodes.size()).equals(rhs.nodes);
+ }
+
+ @Override
+ public Pattern toSchemaPathPattern()
+ {
+ if ( schema == null )
+ {
+ schema = Pattern.compile(buildFullPath(s -> isParameter(s) ? ".*" : s));
+ }
+ return schema;
+ }
+
+ @Override
+ public String fullPath()
+ {
+ checkResolved();
+ if ( fullPath == null )
+ {
+ fullPath = buildFullPath(s -> s);
+ }
+ return fullPath;
+ }
+
+ @Override
+ public String nodeName()
+ {
+ return nodes.get(nodes.size() - 1);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ ZPathImpl zPaths = (ZPathImpl)o;
+
+ return nodes.equals(zPaths.nodes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return nodes.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return nodes.subList(1, nodes.size())
+ .stream().map(name -> isParameter(name) ? name.substring(1) : name)
+ .collect(Collectors.joining(PATH_SEPARATOR, PATH_SEPARATOR, ""));
+ }
+
+ @Override
+ public ZPath resolved(List<Object> parameters)
+ {
+ Iterator<Object> iterator = parameters.iterator();
+ List<String> nodeNames = nodes.stream()
+ .map(name -> {
+ if ( isParameter(name) && iterator.hasNext() )
+ {
+ return NodeName.nameFrom(iterator.next());
+ }
+ return name;
+ })
+ .collect(Collectors.toList());
+ return new ZPathImpl(nodeNames, null);
+ }
+
+ @Override
+ public boolean isResolved()
+ {
+ return isResolved;
+ }
+
+ private static boolean isParameter(String name)
+ {
+ return (name.length() > 1) && name.startsWith(PATH_SEPARATOR);
+ }
+
+ private ZPathImpl(List<String> nodes, String child)
+ {
+ ImmutableList.Builder<String> builder = ImmutableList.<String>builder().addAll(nodes);
+ if ( child != null )
+ {
+ validate(child);
+ builder.add(child);
+ }
+ this.nodes = builder.build();
+ isResolved = this.nodes.stream().noneMatch(ZPathImpl::isParameter);
+ }
+
+ private void checkRootAccess()
+ {
+ if ( isRoot() )
+ {
+ throw new NoSuchElementException("The root has no parent");
+ }
+ }
+
+ private void checkResolved()
+ {
+ if ( !isResolved)
+ {
+ throw new IllegalStateException("This ZPath has not been resolved: " + toString());
+ }
+ }
+
+ private static void validate(String nodeName)
+ {
+ if ( isParameter(Objects.requireNonNull(nodeName, "nodeName cannot be null")) )
+ {
+ return;
+ }
+ if ( nodeName.equals(PATH_SEPARATOR) )
+ {
+ return;
+ }
+ PathUtils.validatePath(PATH_SEPARATOR + nodeName);
+ }
+
+ private String buildFullPath(UnaryOperator<String> filter)
+ {
+ boolean addSeparator = false;
+ StringBuilder str = new StringBuilder();
+ int size = nodes.size();
+ int parameterIndex = 0;
+ for ( int i = 0; i < size; ++i )
+ {
+ if ( i > 1 )
+ {
+ str.append(PATH_SEPARATOR);
+ }
+ str.append(filter.apply(nodes.get(i)));
+ }
+ return str.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java
new file mode 100644
index 0000000..3fa9831
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * <p>
+ * Abstraction that allows the construction of ModelSpecs using strongly typed parameter replacements.
+ * For example, given a ModelSpec with a path such as "/root/registry/people/{id}" where "id" should
+ * be <code>PersonId</code>.
+ * </p>
+ *
+ * <p>
+ * <pre><code>
+ * // Step 1. Create a typed ZPath
+ * TypedZPath<PersonId> typedPath = TypedZPath.from("/root/registry/people/{id}");
+ *
+ * // Step 2. Create a ModelSpec builder (do not build at this point)
+ * ModelSpecBuilder<Person> builder = ModelSpec.builder(JacksonModelSerializer.build(Person.class))
+ *
+ * // Step 3. Create a typed ModelSpec using the typed ZPath and ModelSpec builder
+ * TypedModelSpec<Person, PersonId> typedModelSpec = TypedModelSpec.from(builder, path);
+ *
+ * // later on the TypedModelSpec can be resolved into a useable ModelSpec
+ * ModelSpec<Person> modelSpec = typedModelSpec.resolve(personId);
+ * </pre></code>
+ * </p>
+ */
+@FunctionalInterface
+public interface TypedModelSpec<M, P1>
+{
+ /**
+ * Resolve into a ZPath using the given parameter
+ *
+ * @param p1 the parameter
+ * @return ZPath
+ */
+ ModelSpec<M> resolved(P1 p1);
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and typed path. When
+ * {@link #resolved(Object)} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param path typed path
+ * @return new TypedModelSpec
+ */
+ static <M, P1> TypedModelSpec<M, P1> from(ModelSpecBuilder<M> builder, TypedZPath<P1> path)
+ {
+ return p1 -> builder.withPath(path.resolved(p1)).build();
+ }
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+ * is created from the given full path and When
+ * {@link #resolved(Object)} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param pathWithIds typed path
+ * @return new TypedModelSpec
+ */
+ static <M, P1> TypedModelSpec<M, P1> from(ModelSpecBuilder<M> builder, String pathWithIds)
+ {
+ TypedZPath<P1> zPath = TypedZPath.from(pathWithIds);
+ return p1 -> builder.withPath(zPath.resolved(p1)).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java
new file mode 100644
index 0000000..dee3506
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * Same as {@link TypedModelSpec}, but with 0 parameters
+ */
+@FunctionalInterface
+public interface TypedModelSpec0<M>
+{
+ ModelSpec<M> resolved();
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and typed path. When
+ * {@link #resolved()} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param path typed path
+ * @return new TypedModelSpec
+ */
+ static <M> TypedModelSpec0<M> from(ModelSpecBuilder<M> builder, TypedZPath0 path)
+ {
+ return () -> builder.withPath(path.resolved()).build();
+ }
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+ * is created from the given full path and When
+ * {@link #resolved()} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param pathWithIds typed path
+ * @return new TypedModelSpec
+ */
+ static <M> TypedModelSpec0<M> from(ModelSpecBuilder<M> builder, String pathWithIds)
+ {
+ TypedZPath0 zPath = TypedZPath0.from(pathWithIds);
+ return () -> builder.withPath(zPath.resolved()).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java
new file mode 100644
index 0000000..1b00d66
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * Same as {@link org.apache.curator.x.async.modeled.typed.TypedModelSpec}, but with 10 parameters
+ */
+@FunctionalInterface
+public interface TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10>
+{
+ ModelSpec<M> resolved(P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9, P10 p10);
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and typed path. When
+ * {@link #resolved(Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param path typed path
+ * @return new TypedModelSpec
+ */
+ static <M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> from(ModelSpecBuilder<M> builder, TypedZPath10<P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> path)
+ {
+ return (p1, p2, p3, p4, p5, p6, p7, p8, p9, p10) -> builder.withPath(path.resolved(p1, p2, p3, p4, p5, p6, p7, p8, p9, p10)).build();
+ }
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+ * is created from the given full path and When
+ * {@link #resolved(Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param pathWithIds typed path
+ * @return new TypedModelSpec
+ */
+ static <M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> from(ModelSpecBuilder<M> builder, String pathWithIds)
+ {
+ TypedZPath10<P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> zPath = TypedZPath10.from(pathWithIds);
+ return (p1, p2, p3, p4, p5, p6, p7, p8, p9, p10) -> builder.withPath(zPath.resolved(p1, p2, p3, p4, p5, p6, p7, p8, p9, p10)).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java
new file mode 100644
index 0000000..a56e139
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.typed;
+
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModelSpecBuilder;
+
+/**
+ * Same as {@link org.apache.curator.x.async.modeled.typed.TypedModelSpec}, but with 2 parameters
+ */
+@FunctionalInterface
+public interface TypedModelSpec2<M, P1, P2>
+{
+ ModelSpec<M> resolved(P1 p1, P2 p2);
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and typed path. When
+ * {@link #resolved(Object, Object)} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param path typed path
+ * @return new TypedModelSpec
+ */
+ static <M, P1, P2> TypedModelSpec2<M, P1, P2> from(ModelSpecBuilder<M> builder, TypedZPath2<P1, P2> path)
+ {
+ return (p1, p2) -> builder.withPath(path.resolved(p1, p2)).build();
+ }
+
+ /**
+ * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath
+ * is created from the given full path and When
+ * {@link #resolved(Object, Object)} is called the actual model spec is generated with the
+ * resolved path
+ *
+ * @param builder model spec builder
+ * @param pathWithIds typed path
+ * @return new TypedModelSpec
+ */
+ static <M, P1, P2> TypedModelSpec2<M, P1, P2> from(ModelSpecBuilder<M> builder, String pathWithIds)
+ {
+ TypedZPath2<P1, P2> zPath = TypedZPath2.from(pathWithIds);
+ return (p1, p2) -> builder.withPath(zPath.resolved(p1, p2)).build();
+ }
+}