You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/07 07:55:34 UTC

[1/3] curator git commit: better exception handling for serialization issues

Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 f2370b771 -> 1110ab3bb


better exception handling for serialization issues


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fe0a88b3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fe0a88b3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fe0a88b3

Branch: refs/heads/CURATOR-397
Commit: fe0a88b3b7b9fccf9da022bc0a13e440b1f8435c
Parents: f2370b7
Author: randgalt <ra...@apache.org>
Authored: Fri May 5 18:38:46 2017 -0400
Committer: randgalt <ra...@apache.org>
Committed: Fri May 5 18:38:46 2017 -0400

----------------------------------------------------------------------
 .../modeled/cached/ModeledCacheListener.java    | 12 ++++++++
 .../async/modeled/details/ModeledCacheImpl.java | 23 ++++++++++++---
 .../modeled/details/ModeledFrameworkImpl.java   | 30 ++++++++++++++++----
 3 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
index 544de78..4f1ac70 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.cached;
 
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.LoggerFactory;
 
 @FunctionalInterface
 public interface ModeledCacheListener<T>
@@ -66,6 +67,17 @@ public interface ModeledCacheListener<T>
     }
 
     /**
+     * Called when there is an exception processing a message from the internal cache. This is most
+     * likely due to a de-serialization problem.
+     *
+     * @param e the exception
+     */
+    default void handleException(Exception e)
+    {
+        LoggerFactory.getLogger(getClass()).error("Could not process cache message", e);
+    }
+
+    /**
      * Returns a version of this listener that only begins calling
      * {@link #accept(org.apache.curator.x.async.modeled.cached.ModeledCacheListener.Type, org.apache.curator.x.async.modeled.ZPath, org.apache.zookeeper.data.Stat, Object)}
      * once {@link #initialized()} has been called. i.e. changes that occur as the cache is initializing are not sent

http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 091a727..353c28a 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -18,13 +18,13 @@
  */
 package org.apache.curator.x.async.modeled.details;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
@@ -37,9 +37,7 @@ import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
@@ -119,7 +117,24 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
     }
 
     @Override
-    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+    public void childEvent(CuratorFramework client, TreeCacheEvent event)
+    {
+        try
+        {
+            internalChildEvent(event);
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+
+            listenerContainer.forEach(l -> {
+                l.handleException(e);
+                return null;
+            });
+        }
+    }
+
+    private void internalChildEvent(TreeCacheEvent event) throws Exception
     {
         switch ( event.getType() )
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index b666822..66f10de 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -131,8 +131,19 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public AsyncStage<String> set(T item, Stat storingStatIn)
     {
-        byte[] bytes = modelSpec.serializer().serialize(item);
-        return dslClient.create().withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl()).forPath(modelSpec.path().fullPath(), bytes);
+        try
+        {
+            byte[] bytes = modelSpec.serializer().serialize(item);
+            return dslClient.create()
+                .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl())
+                .forPath(modelSpec.path().fullPath(), bytes);
+        }
+        catch ( Exception e )
+        {
+            ModelStage<String> exceptionStage = new ModelStage<>();
+            exceptionStage.completeExceptionally(e);
+            return exceptionStage;
+        }
     }
 
     private List<ACL> fixAclList(List<ACL> aclList)
@@ -189,9 +200,18 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public AsyncStage<Stat> update(T item, int version)
     {
-        byte[] bytes = modelSpec.serializer().serialize(item);
-        AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData();
-        return next.forPath(modelSpec.path().fullPath(), bytes);
+        try
+        {
+            byte[] bytes = modelSpec.serializer().serialize(item);
+            AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData();
+            return next.forPath(modelSpec.path().fullPath(), bytes);
+        }
+        catch ( Exception e )
+        {
+            ModelStage<Stat> exceptionStage = new ModelStage<>();
+            exceptionStage.completeExceptionally(e);
+            return exceptionStage;
+        }
     }
 
     @Override


[2/3] curator git commit: Added variant to readAsZNode

Posted by ra...@apache.org.
Added variant to readAsZNode


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c1878392
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c1878392
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c1878392

Branch: refs/heads/CURATOR-397
Commit: c18783924af4b4c3f11d098165c85bf313f454c2
Parents: fe0a88b
Author: randgalt <ra...@apache.org>
Authored: Sun May 7 09:41:55 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun May 7 09:41:55 2017 +0200

----------------------------------------------------------------------
 .../x/async/modeled/ModeledFramework.java       |  8 +++
 .../apache/curator/x/async/modeled/ZNode.java   | 49 ++++++++++++++
 .../x/async/modeled/cached/ModeledCache.java    |  1 +
 .../curator/x/async/modeled/cached/ZNode.java   | 49 --------------
 .../details/CachedModeledFrameworkImpl.java     | 29 ++++++---
 .../async/modeled/details/ModeledCacheImpl.java |  2 +-
 .../modeled/details/ModeledFrameworkImpl.java   | 68 ++++++++++++--------
 .../x/async/modeled/details/ZNodeImpl.java      |  2 +-
 8 files changed, 122 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index f8cf4c7..b475712 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -170,6 +170,14 @@ public interface ModeledFramework<T>
     AsyncStage<T> read(Stat storingStatIn);
 
     /**
+     * Read the ZNode at this instance's path and deserialize into a model
+     *
+     * @return AsyncStage
+     * @see org.apache.curator.x.async.AsyncStage
+     */
+    AsyncStage<ZNode<T>> readAsZNode();
+
+    /**
      * Update the ZNode at this instance's path with a serialized
      * form of the given model passing "-1" for the update version
      *

http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
new file mode 100644
index 0000000..7ed6ef5
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Abstracts a ZooKeeper node
+ */
+public interface ZNode<T>
+{
+    /**
+     * The path of the node
+     *
+     * @return path
+     */
+    ZPath path();
+
+    /**
+     * The node's last known stat if available
+     *
+     * @return stat
+     */
+    Stat stat();
+
+    /**
+     * The node's current model
+     *
+     * @return model
+     */
+    T model();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
index 9289988..6677268 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.modeled.cached;
 
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import java.util.Map;
 import java.util.Optional;

http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java
deleted file mode 100644
index 88f3489..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.cached;
-
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * Abstracts a cached node
- */
-public interface ZNode<T>
-{
-    /**
-     * The path of the node
-     *
-     * @return path
-     */
-    ZPath path();
-
-    /**
-     * The node's last known stat if available
-     *
-     * @return stat
-     */
-    Stat stat();
-
-    /**
-     * The node's current model
-     *
-     * @return model
-     */
-    T model();
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 2209d37..9ef88e8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -30,7 +30,7 @@ import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.cached.ZNode;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
@@ -39,6 +39,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 
 class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
 {
@@ -133,21 +134,25 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public AsyncStage<T> read()
     {
-        return read(null);
+        return internalRead(ZNode::model);
     }
 
     @Override
     public AsyncStage<T> read(Stat storingStatIn)
     {
-        ZPath path = client.modelSpec().path();
-        Optional<ZNode<T>> data = cache.currentData(path);
-        return data.map(node -> {
+        return internalRead(n -> {
             if ( storingStatIn != null )
             {
-                DataTree.copyStat(node.stat(), storingStatIn);
+                DataTree.copyStat(n.stat(), storingStatIn);
             }
-            return completed(new ModelStage<>(), node.model());
-        }).orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath())));
+            return n.model();
+        });
+    }
+
+    @Override
+    public AsyncStage<ZNode<T>> readAsZNode()
+    {
+        return internalRead(Function.identity());
     }
 
     @Override
@@ -248,4 +253,12 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
         executor.execute(() -> stage.completeExceptionally(e));
         return stage;
     }
+
+    private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver)
+    {
+        ZPath path = client.modelSpec().path();
+        Optional<ZNode<T>> data = cache.currentData(path);
+        return data.map(node -> completed(new ModelStage<>(), resolver.apply(node)))
+            .orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath())));
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 353c28a..2de57c1 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -31,7 +31,7 @@ import org.apache.curator.x.async.modeled.ModelSpec;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.cached.ZNode;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 66f10de..3bb1b73 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -35,15 +35,18 @@ import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
 import org.apache.curator.x.async.modeled.ModelSpec;
 import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -154,41 +157,25 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public AsyncStage<T> read()
     {
-        return read(null);
+        return internalRead(ZNode::model);
     }
 
     @Override
     public AsyncStage<T> read(Stat storingStatIn)
     {
-        AsyncPathable<AsyncStage<byte[]>> next;
-        if ( isCompressed() )
-        {
-            next = (storingStatIn != null) ? watchableClient.getData().decompressedStoringStatIn(storingStatIn) : watchableClient.getData().decompressed();
-        }
-        else
-        {
-            next = (storingStatIn != null) ? watchableClient.getData().storingStatIn(storingStatIn) : watchableClient.getData();
-        }
-        AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath());
-        ModelStage<T> modelStage = new ModelStage<>(asyncStage.event());
-        asyncStage.whenComplete((value, e) -> {
-            if ( e != null )
+        return internalRead(n -> {
+            if ( storingStatIn != null )
             {
-                modelStage.completeExceptionally(e);
-            }
-            else
-            {
-                try
-                {
-                    modelStage.complete(modelSpec.serializer().deserialize(value));
-                }
-                catch ( Exception deserializeException )
-                {
-                    modelStage.completeExceptionally(deserializeException);
-                }
+                DataTree.copyStat(n.stat(), storingStatIn);
             }
+            return n.model();
         });
-        return modelStage;
+    }
+
+    @Override
+    public AsyncStage<ZNode<T>> readAsZNode()
+    {
+        return internalRead(Function.identity());
     }
 
     @Override
@@ -349,4 +336,31 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     {
         return modelSpec.createOptions().contains(CreateOption.compress);
     }
+
+    private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver)
+    {
+        Stat stat = new Stat();
+        AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? watchableClient.getData().decompressedStoringStatIn(stat) : watchableClient.getData().storingStatIn(stat);
+        AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath());
+        ModelStage<U> modelStage = new ModelStage<>(asyncStage.event());
+        asyncStage.whenComplete((value, e) -> {
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                try
+                {
+                    ZNode<T> node = new ZNodeImpl<>(modelSpec.path(), stat, modelSpec.serializer().deserialize(value));
+                    modelStage.complete(resolver.apply(node));
+                }
+                catch ( Exception deserializeException )
+                {
+                    modelStage.completeExceptionally(deserializeException);
+                }
+            }
+        });
+        return modelStage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
index 20dcaac..85bedf4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.curator.x.async.modeled.details;
 
-import org.apache.curator.x.async.modeled.cached.ZNode;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.zookeeper.data.Stat;
 import java.util.Objects;


[3/3] curator git commit: Support setting the version when doing a create or set operation

Posted by ra...@apache.org.
Support setting the version when doing a create or set operation


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1110ab3b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1110ab3b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1110ab3b

Branch: refs/heads/CURATOR-397
Commit: 1110ab3bbc55748c053adcd909cc0d82be84309d
Parents: c187839
Author: randgalt <ra...@apache.org>
Authored: Sun May 7 09:55:28 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun May 7 09:55:28 2017 +0200

----------------------------------------------------------------------
 .../framework/imps/CreateBuilderImpl.java       | 10 +++++--
 .../curator/x/async/api/AsyncCreateBuilder.java | 28 ++++++++++++++++++++
 .../x/async/details/AsyncCreateBuilderImpl.java | 21 +++++++++++++++
 .../x/async/modeled/ModeledFramework.java       | 23 ++++++++++++++++
 .../details/CachedModeledFrameworkImpl.java     | 12 +++++++++
 .../modeled/details/ModeledFrameworkImpl.java   | 16 +++++++++--
 6 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index fdd1e15..60f49c5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -52,6 +52,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
     private boolean doProtected;
     private boolean compress;
     private boolean setDataIfExists;
+    private int setDataIfExistsVersion = -1;
     private String protectedId;
     private ACLing acling;
     private Stat storingStat;
@@ -95,6 +96,11 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
         this.ttl = ttl;
     }
 
+    public void setSetDataIfExistsVersion(int version)
+    {
+        this.setDataIfExistsVersion = version;
+    }
+
     @Override
     public CreateBuilder2 orSetData()
     {
@@ -751,7 +757,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
             {
                 try
                 {
-                    client.getZooKeeper().setData(path, mainOperationAndData.getData().getData(), -1, statCallback, backgrounding.getContext());
+                    client.getZooKeeper().setData(path, mainOperationAndData.getData().getData(), setDataIfExistsVersion, statCallback, backgrounding.getContext());
                 }
                 catch ( KeeperException e )
                 {
@@ -1078,7 +1084,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                             {
                                 if ( setDataIfExists )
                                 {
-                                    client.getZooKeeper().setData(path, data, -1);
+                                    client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
                                     createdPath = path;
                                 }
                                 else

http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
index e5f2d8c..7ed934e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
@@ -67,6 +67,15 @@ public interface AsyncCreateBuilder extends AsyncPathAndBytesable<AsyncStage<Str
     AsyncPathAndBytesable<AsyncStage<String>> withTtl(long ttl);
 
     /**
+     * Specify the setData expected matching version when using option
+     * {@link org.apache.curator.x.async.api.CreateOption#setDataIfExists}. By default -1 is used.
+     *
+     * @param version setData expected matching version
+     * @return this for chaining
+     */
+    AsyncPathAndBytesable<AsyncStage<String>> withSetDataVersion(int version);
+
+    /**
      * Options to change how the ZNode is created
      *
      * @param options options
@@ -141,4 +150,23 @@ public interface AsyncCreateBuilder extends AsyncPathAndBytesable<AsyncStage<Str
      * @return this
      */
     AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat, long ttl);
+
+    /**
+     * set options, mode, ACLs, and stat
+     *
+     * @param options options
+     * @param createMode mode to use
+     * @param aclList the ACL list to use
+     * @param stat the stat to have filled in
+     * @param ttl the ttl or 0
+     * @param setDataVersion the setData matching version or -1
+     * @see #withOptions(java.util.Set)
+     * @see #withMode(org.apache.zookeeper.CreateMode)
+     * @see #withACL(java.util.List)
+     * @see #storingStatIn(org.apache.zookeeper.data.Stat)
+     * @see #withTtl(long)
+     * @see #withSetDataVersion(long)
+     * @return this
+     */
+    AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat, long ttl, int setDataVersion);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index e8b1d30..c27639e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -44,6 +44,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
     private Set<CreateOption> options = Collections.emptySet();
     private Stat stat = null;
     private long ttl = -1;
+    private int setDataVersion = -1;
 
     AsyncCreateBuilderImpl(CuratorFrameworkImpl client, Filters filters)
     {
@@ -80,6 +81,13 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
     }
 
     @Override
+    public AsyncPathAndBytesable<AsyncStage<String>> withSetDataVersion(int version)
+    {
+        this.setDataVersion = version;
+        return this;
+    }
+
+    @Override
     public AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options)
     {
         this.options = Objects.requireNonNull(options, "options cannot be null");
@@ -133,6 +141,18 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
     }
 
     @Override
+    public AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat, long ttl, int setDataVersion)
+    {
+        this.options = Objects.requireNonNull(options, "options cannot be null");
+        this.aclList = aclList;
+        this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+        this.stat = stat;
+        this.ttl = ttl;
+        this.setDataVersion = setDataVersion;
+        return this;
+    }
+
+    @Override
     public AsyncStage<String> forPath(String path)
     {
         return internalForPath(path, null, false);
@@ -159,6 +179,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
             stat,
             ttl
         );
+        builder.setSetDataIfExistsVersion(setDataVersion);
         return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path));
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index b475712..0a74938 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -143,6 +143,17 @@ public interface ModeledFramework<T>
 
     /**
      * Create (or update depending on build options) a ZNode at this instance's path with a serialized
+     * version of the given model
+     *
+     * @param model model to write
+     * @param version if data is being set instead of creating the node, the data version to use
+     * @return AsyncStage
+     * @see org.apache.curator.x.async.AsyncStage
+     */
+    AsyncStage<String> set(T model, int version);
+
+    /**
+     * Create (or update depending on build options) a ZNode at this instance's path with a serialized
      * form of the given model
      *
      * @param model model to write
@@ -153,6 +164,18 @@ public interface ModeledFramework<T>
     AsyncStage<String> set(T model, Stat storingStatIn);
 
     /**
+     * Create (or update depending on build options) a ZNode at this instance's path with a serialized
+     * form of the given model
+     *
+     * @param model model to write
+     * @param version if data is being set instead of creating the node, the data version to use
+     * @param storingStatIn the stat for the new ZNode is stored here
+     * @return AsyncStage
+     * @see org.apache.curator.x.async.AsyncStage
+     */
+    AsyncStage<String> set(T model, Stat storingStatIn, int version);
+
+    /**
      * Read the ZNode at this instance's path and deserialize into a model
      *
      * @return AsyncStage

http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 9ef88e8..8359ae0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -132,6 +132,18 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     }
 
     @Override
+    public AsyncStage<String> set(T model, Stat storingStatIn, int version)
+    {
+        return client.set(model, storingStatIn, version);
+    }
+
+    @Override
+    public AsyncStage<String> set(T model, int version)
+    {
+        return client.set(model, version);
+    }
+
+    @Override
     public AsyncStage<T> read()
     {
         return internalRead(ZNode::model);

http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 3bb1b73..ba67a71 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -128,17 +128,29 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public AsyncStage<String> set(T item)
     {
-        return set(item, null);
+        return set(item, null, -1);
     }
 
     @Override
     public AsyncStage<String> set(T item, Stat storingStatIn)
     {
+        return set(item, storingStatIn, -1);
+    }
+
+    @Override
+    public AsyncStage<String> set(T item, int version)
+    {
+        return set(item, null, -1);
+    }
+
+    @Override
+    public AsyncStage<String> set(T item, Stat storingStatIn, int version)
+    {
         try
         {
             byte[] bytes = modelSpec.serializer().serialize(item);
             return dslClient.create()
-                .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl())
+                .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl(), version)
                 .forPath(modelSpec.path().fullPath(), bytes);
         }
         catch ( Exception e )