You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/07 07:55:34 UTC
[1/3] curator git commit: better exception handling for serialization
issues
Repository: curator
Updated Branches:
refs/heads/CURATOR-397 f2370b771 -> 1110ab3bb
better exception handling for serialization issues
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fe0a88b3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fe0a88b3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fe0a88b3
Branch: refs/heads/CURATOR-397
Commit: fe0a88b3b7b9fccf9da022bc0a13e440b1f8435c
Parents: f2370b7
Author: randgalt <ra...@apache.org>
Authored: Fri May 5 18:38:46 2017 -0400
Committer: randgalt <ra...@apache.org>
Committed: Fri May 5 18:38:46 2017 -0400
----------------------------------------------------------------------
.../modeled/cached/ModeledCacheListener.java | 12 ++++++++
.../async/modeled/details/ModeledCacheImpl.java | 23 ++++++++++++---
.../modeled/details/ModeledFrameworkImpl.java | 30 ++++++++++++++++----
3 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
index 544de78..4f1ac70 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.cached;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.LoggerFactory;
@FunctionalInterface
public interface ModeledCacheListener<T>
@@ -66,6 +67,17 @@ public interface ModeledCacheListener<T>
}
/**
+ * Called when there is an exception processing a message from the internal cache. This is most
+ * likely due to a de-serialization problem.
+ *
+ * @param e the exception
+ */
+ default void handleException(Exception e)
+ {
+ LoggerFactory.getLogger(getClass()).error("Could not process cache message", e);
+ }
+
+ /**
* Returns a version of this listener that only begins calling
* {@link #accept(org.apache.curator.x.async.modeled.cached.ModeledCacheListener.Type, org.apache.curator.x.async.modeled.ZPath, org.apache.zookeeper.data.Stat, Object)}
* once {@link #initialized()} has been called. i.e. changes that occur as the cache is initializing are not sent
http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 091a727..353c28a 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -18,13 +18,13 @@
*/
package org.apache.curator.x.async.modeled.details;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
import org.apache.curator.x.async.modeled.ModelSpec;
@@ -37,9 +37,7 @@ import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
@@ -119,7 +117,24 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
}
@Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+ public void childEvent(CuratorFramework client, TreeCacheEvent event)
+ {
+ try
+ {
+ internalChildEvent(event);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+
+ listenerContainer.forEach(l -> {
+ l.handleException(e);
+ return null;
+ });
+ }
+ }
+
+ private void internalChildEvent(TreeCacheEvent event) throws Exception
{
switch ( event.getType() )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index b666822..66f10de 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -131,8 +131,19 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
@Override
public AsyncStage<String> set(T item, Stat storingStatIn)
{
- byte[] bytes = modelSpec.serializer().serialize(item);
- return dslClient.create().withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl()).forPath(modelSpec.path().fullPath(), bytes);
+ try
+ {
+ byte[] bytes = modelSpec.serializer().serialize(item);
+ return dslClient.create()
+ .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl())
+ .forPath(modelSpec.path().fullPath(), bytes);
+ }
+ catch ( Exception e )
+ {
+ ModelStage<String> exceptionStage = new ModelStage<>();
+ exceptionStage.completeExceptionally(e);
+ return exceptionStage;
+ }
}
private List<ACL> fixAclList(List<ACL> aclList)
@@ -189,9 +200,18 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
@Override
public AsyncStage<Stat> update(T item, int version)
{
- byte[] bytes = modelSpec.serializer().serialize(item);
- AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData();
- return next.forPath(modelSpec.path().fullPath(), bytes);
+ try
+ {
+ byte[] bytes = modelSpec.serializer().serialize(item);
+ AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData();
+ return next.forPath(modelSpec.path().fullPath(), bytes);
+ }
+ catch ( Exception e )
+ {
+ ModelStage<Stat> exceptionStage = new ModelStage<>();
+ exceptionStage.completeExceptionally(e);
+ return exceptionStage;
+ }
}
@Override
[2/3] curator git commit: Added variant to readAsZNode
Posted by ra...@apache.org.
Added variant to readAsZNode
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c1878392
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c1878392
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c1878392
Branch: refs/heads/CURATOR-397
Commit: c18783924af4b4c3f11d098165c85bf313f454c2
Parents: fe0a88b
Author: randgalt <ra...@apache.org>
Authored: Sun May 7 09:41:55 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun May 7 09:41:55 2017 +0200
----------------------------------------------------------------------
.../x/async/modeled/ModeledFramework.java | 8 +++
.../apache/curator/x/async/modeled/ZNode.java | 49 ++++++++++++++
.../x/async/modeled/cached/ModeledCache.java | 1 +
.../curator/x/async/modeled/cached/ZNode.java | 49 --------------
.../details/CachedModeledFrameworkImpl.java | 29 ++++++---
.../async/modeled/details/ModeledCacheImpl.java | 2 +-
.../modeled/details/ModeledFrameworkImpl.java | 68 ++++++++++++--------
.../x/async/modeled/details/ZNodeImpl.java | 2 +-
8 files changed, 122 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index f8cf4c7..b475712 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -170,6 +170,14 @@ public interface ModeledFramework<T>
AsyncStage<T> read(Stat storingStatIn);
/**
+ * Read the ZNode at this instance's path and deserialize into a model
+ *
+ * @return AsyncStage
+ * @see org.apache.curator.x.async.AsyncStage
+ */
+ AsyncStage<ZNode<T>> readAsZNode();
+
+ /**
* Update the ZNode at this instance's path with a serialized
* form of the given model passing "-1" for the update version
*
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
new file mode 100644
index 0000000..7ed6ef5
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Abstracts a ZooKeeper node
+ */
+public interface ZNode<T>
+{
+ /**
+ * The path of the node
+ *
+ * @return path
+ */
+ ZPath path();
+
+ /**
+ * The node's last known stat if available
+ *
+ * @return stat
+ */
+ Stat stat();
+
+ /**
+ * The node's current model
+ *
+ * @return model
+ */
+ T model();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
index 9289988..6677268 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.x.async.modeled.cached;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import java.util.Map;
import java.util.Optional;
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java
deleted file mode 100644
index 88f3489..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.cached;
-
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * Abstracts a cached node
- */
-public interface ZNode<T>
-{
- /**
- * The path of the node
- *
- * @return path
- */
- ZPath path();
-
- /**
- * The node's last known stat if available
- *
- * @return stat
- */
- Stat stat();
-
- /**
- * The node's current model
- *
- * @return model
- */
- T model();
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 2209d37..9ef88e8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -30,7 +30,7 @@ import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCache;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.cached.ZNode;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
@@ -39,6 +39,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
@@ -133,21 +134,25 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
@Override
public AsyncStage<T> read()
{
- return read(null);
+ return internalRead(ZNode::model);
}
@Override
public AsyncStage<T> read(Stat storingStatIn)
{
- ZPath path = client.modelSpec().path();
- Optional<ZNode<T>> data = cache.currentData(path);
- return data.map(node -> {
+ return internalRead(n -> {
if ( storingStatIn != null )
{
- DataTree.copyStat(node.stat(), storingStatIn);
+ DataTree.copyStat(n.stat(), storingStatIn);
}
- return completed(new ModelStage<>(), node.model());
- }).orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath())));
+ return n.model();
+ });
+ }
+
+ @Override
+ public AsyncStage<ZNode<T>> readAsZNode()
+ {
+ return internalRead(Function.identity());
}
@Override
@@ -248,4 +253,12 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
executor.execute(() -> stage.completeExceptionally(e));
return stage;
}
+
+ private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver)
+ {
+ ZPath path = client.modelSpec().path();
+ Optional<ZNode<T>> data = cache.currentData(path);
+ return data.map(node -> completed(new ModelStage<>(), resolver.apply(node)))
+ .orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath())));
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 353c28a..2de57c1 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -31,7 +31,7 @@ import org.apache.curator.x.async.modeled.ModelSpec;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.ModeledCache;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.cached.ZNode;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.zookeeper.data.Stat;
import java.util.AbstractMap;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 66f10de..3bb1b73 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -35,15 +35,18 @@ import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
import org.apache.curator.x.async.modeled.ModelSpec;
import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -154,41 +157,25 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
@Override
public AsyncStage<T> read()
{
- return read(null);
+ return internalRead(ZNode::model);
}
@Override
public AsyncStage<T> read(Stat storingStatIn)
{
- AsyncPathable<AsyncStage<byte[]>> next;
- if ( isCompressed() )
- {
- next = (storingStatIn != null) ? watchableClient.getData().decompressedStoringStatIn(storingStatIn) : watchableClient.getData().decompressed();
- }
- else
- {
- next = (storingStatIn != null) ? watchableClient.getData().storingStatIn(storingStatIn) : watchableClient.getData();
- }
- AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath());
- ModelStage<T> modelStage = new ModelStage<>(asyncStage.event());
- asyncStage.whenComplete((value, e) -> {
- if ( e != null )
+ return internalRead(n -> {
+ if ( storingStatIn != null )
{
- modelStage.completeExceptionally(e);
- }
- else
- {
- try
- {
- modelStage.complete(modelSpec.serializer().deserialize(value));
- }
- catch ( Exception deserializeException )
- {
- modelStage.completeExceptionally(deserializeException);
- }
+ DataTree.copyStat(n.stat(), storingStatIn);
}
+ return n.model();
});
- return modelStage;
+ }
+
+ @Override
+ public AsyncStage<ZNode<T>> readAsZNode()
+ {
+ return internalRead(Function.identity());
}
@Override
@@ -349,4 +336,31 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
{
return modelSpec.createOptions().contains(CreateOption.compress);
}
+
+ private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver)
+ {
+ Stat stat = new Stat();
+ AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? watchableClient.getData().decompressedStoringStatIn(stat) : watchableClient.getData().storingStatIn(stat);
+ AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath());
+ ModelStage<U> modelStage = new ModelStage<>(asyncStage.event());
+ asyncStage.whenComplete((value, e) -> {
+ if ( e != null )
+ {
+ modelStage.completeExceptionally(e);
+ }
+ else
+ {
+ try
+ {
+ ZNode<T> node = new ZNodeImpl<>(modelSpec.path(), stat, modelSpec.serializer().deserialize(value));
+ modelStage.complete(resolver.apply(node));
+ }
+ catch ( Exception deserializeException )
+ {
+ modelStage.completeExceptionally(deserializeException);
+ }
+ }
+ });
+ return modelStage;
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
index 20dcaac..85bedf4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java
@@ -18,7 +18,7 @@
*/
package org.apache.curator.x.async.modeled.details;
-import org.apache.curator.x.async.modeled.cached.ZNode;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.zookeeper.data.Stat;
import java.util.Objects;
[3/3] curator git commit: Support setting the version when doing a
create or set operation
Posted by ra...@apache.org.
Support setting the version when doing a create or set operation
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1110ab3b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1110ab3b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1110ab3b
Branch: refs/heads/CURATOR-397
Commit: 1110ab3bbc55748c053adcd909cc0d82be84309d
Parents: c187839
Author: randgalt <ra...@apache.org>
Authored: Sun May 7 09:55:28 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun May 7 09:55:28 2017 +0200
----------------------------------------------------------------------
.../framework/imps/CreateBuilderImpl.java | 10 +++++--
.../curator/x/async/api/AsyncCreateBuilder.java | 28 ++++++++++++++++++++
.../x/async/details/AsyncCreateBuilderImpl.java | 21 +++++++++++++++
.../x/async/modeled/ModeledFramework.java | 23 ++++++++++++++++
.../details/CachedModeledFrameworkImpl.java | 12 +++++++++
.../modeled/details/ModeledFrameworkImpl.java | 16 +++++++++--
6 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index fdd1e15..60f49c5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -52,6 +52,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
private boolean doProtected;
private boolean compress;
private boolean setDataIfExists;
+ private int setDataIfExistsVersion = -1;
private String protectedId;
private ACLing acling;
private Stat storingStat;
@@ -95,6 +96,11 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
this.ttl = ttl;
}
+ public void setSetDataIfExistsVersion(int version)
+ {
+ this.setDataIfExistsVersion = version;
+ }
+
@Override
public CreateBuilder2 orSetData()
{
@@ -751,7 +757,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
{
try
{
- client.getZooKeeper().setData(path, mainOperationAndData.getData().getData(), -1, statCallback, backgrounding.getContext());
+ client.getZooKeeper().setData(path, mainOperationAndData.getData().getData(), setDataIfExistsVersion, statCallback, backgrounding.getContext());
}
catch ( KeeperException e )
{
@@ -1078,7 +1084,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
{
if ( setDataIfExists )
{
- client.getZooKeeper().setData(path, data, -1);
+ client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
createdPath = path;
}
else
http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
index e5f2d8c..7ed934e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCreateBuilder.java
@@ -67,6 +67,15 @@ public interface AsyncCreateBuilder extends AsyncPathAndBytesable<AsyncStage<Str
AsyncPathAndBytesable<AsyncStage<String>> withTtl(long ttl);
/**
+ * Specify the setData expected matching version when using option
+ * {@link org.apache.curator.x.async.api.CreateOption#setDataIfExists}. By default -1 is used.
+ *
+ * @param version setData expected matching version
+ * @return this for chaining
+ */
+ AsyncPathAndBytesable<AsyncStage<String>> withSetDataVersion(int version);
+
+ /**
* Options to change how the ZNode is created
*
* @param options options
@@ -141,4 +150,23 @@ public interface AsyncCreateBuilder extends AsyncPathAndBytesable<AsyncStage<Str
* @return this
*/
AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat, long ttl);
+
+ /**
+ * set options, mode, ACLs, and stat
+ *
+ * @param options options
+ * @param createMode mode to use
+ * @param aclList the ACL list to use
+ * @param stat the stat to have filled in
+ * @param ttl the ttl or 0
+ * @param setDataVersion the setData matching version or -1
+ * @see #withOptions(java.util.Set)
+ * @see #withMode(org.apache.zookeeper.CreateMode)
+ * @see #withACL(java.util.List)
+ * @see #storingStatIn(org.apache.zookeeper.data.Stat)
+ * @see #withTtl(long)
+ * @see #withSetDataVersion(long)
+ * @return this
+ */
+ AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat, long ttl, int setDataVersion);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index e8b1d30..c27639e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -44,6 +44,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
private Set<CreateOption> options = Collections.emptySet();
private Stat stat = null;
private long ttl = -1;
+ private int setDataVersion = -1;
AsyncCreateBuilderImpl(CuratorFrameworkImpl client, Filters filters)
{
@@ -80,6 +81,13 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
}
@Override
+ public AsyncPathAndBytesable<AsyncStage<String>> withSetDataVersion(int version)
+ {
+ this.setDataVersion = version;
+ return this;
+ }
+
+ @Override
public AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options)
{
this.options = Objects.requireNonNull(options, "options cannot be null");
@@ -133,6 +141,18 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
}
@Override
+ public AsyncPathAndBytesable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat, long ttl, int setDataVersion)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.aclList = aclList;
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ this.stat = stat;
+ this.ttl = ttl;
+ this.setDataVersion = setDataVersion;
+ return this;
+ }
+
+ @Override
public AsyncStage<String> forPath(String path)
{
return internalForPath(path, null, false);
@@ -159,6 +179,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
stat,
ttl
);
+ builder.setSetDataIfExistsVersion(setDataVersion);
return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path));
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index b475712..0a74938 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -143,6 +143,17 @@ public interface ModeledFramework<T>
/**
* Create (or update depending on build options) a ZNode at this instance's path with a serialized
+ * version of the given model
+ *
+ * @param model model to write
+ * @param version if data is being set instead of creating the node, the data version to use
+ * @return AsyncStage
+ * @see org.apache.curator.x.async.AsyncStage
+ */
+ AsyncStage<String> set(T model, int version);
+
+ /**
+ * Create (or update depending on build options) a ZNode at this instance's path with a serialized
* form of the given model
*
* @param model model to write
@@ -153,6 +164,18 @@ public interface ModeledFramework<T>
AsyncStage<String> set(T model, Stat storingStatIn);
/**
+ * Create (or update depending on build options) a ZNode at this instance's path with a serialized
+ * form of the given model
+ *
+ * @param model model to write
+ * @param version if data is being set instead of creating the node, the data version to use
+ * @param storingStatIn the stat for the new ZNode is stored here
+ * @return AsyncStage
+ * @see org.apache.curator.x.async.AsyncStage
+ */
+ AsyncStage<String> set(T model, Stat storingStatIn, int version);
+
+ /**
* Read the ZNode at this instance's path and deserialize into a model
*
* @return AsyncStage
http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 9ef88e8..8359ae0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -132,6 +132,18 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
}
@Override
+ public AsyncStage<String> set(T model, Stat storingStatIn, int version)
+ {
+ return client.set(model, storingStatIn, version);
+ }
+
+ @Override
+ public AsyncStage<String> set(T model, int version)
+ {
+ return client.set(model, version);
+ }
+
+ @Override
public AsyncStage<T> read()
{
return internalRead(ZNode::model);
http://git-wip-us.apache.org/repos/asf/curator/blob/1110ab3b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 3bb1b73..ba67a71 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -128,17 +128,29 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
@Override
public AsyncStage<String> set(T item)
{
- return set(item, null);
+ return set(item, null, -1);
}
@Override
public AsyncStage<String> set(T item, Stat storingStatIn)
{
+ return set(item, storingStatIn, -1);
+ }
+
+ @Override
+ public AsyncStage<String> set(T item, int version)
+ {
+ return set(item, null, -1);
+ }
+
+ @Override
+ public AsyncStage<String> set(T item, Stat storingStatIn, int version)
+ {
try
{
byte[] bytes = modelSpec.serializer().serialize(item);
return dslClient.create()
- .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl())
+ .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl(), version)
.forPath(modelSpec.path().fullPath(), bytes);
}
catch ( Exception e )