You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/08/08 04:09:11 UTC
[4/9] git commit: Fix potential race condition.
Fix potential race condition.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/26749277
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/26749277
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/26749277
Branch: refs/heads/master
Commit: 267492779ac127d22c67791c799b73d46dfeac7a
Parents: f474333
Author: Scott Blum <sc...@squareup.com>
Authored: Fri Aug 1 15:23:16 2014 -0400
Committer: Scott Blum <sc...@squareup.com>
Committed: Fri Aug 1 15:28:15 2014 -0400
----------------------------------------------------------------------
.../framework/recipes/cache/TreeCache.java | 58 +++++++++++++-------
1 file changed, 39 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/26749277/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f73861d..0d5995a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,14 +71,14 @@ public class TreeCache implements Closeable
private final class TreeNode implements Watcher, BackgroundCallback
{
- private final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
- private final String path;
- private final TreeNode parent;
- private final AtomicReference<Stat> stat = new AtomicReference<Stat>();
- private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
- private final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
-
- private TreeNode(String path, TreeNode parent)
+ final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
+ final TreeNode parent;
+ final String path;
+ final AtomicReference<Stat> stat = new AtomicReference<Stat>();
+ final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+ final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
+
+ TreeNode(String path, TreeNode parent)
{
this.path = path;
this.parent = parent;
@@ -86,19 +86,30 @@ public class TreeCache implements Closeable
private void refresh() throws Exception
{
- refreshData();
- refreshChildren();
+ outstandingOps.addAndGet(2);
+ doRefreshData();
+ doRefreshChildren();
}
private void refreshChildren() throws Exception
{
outstandingOps.incrementAndGet();
- client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+ doRefreshChildren();
}
private void refreshData() throws Exception
{
outstandingOps.incrementAndGet();
+ doRefreshData();
+ }
+
+ private void doRefreshChildren() throws Exception
+ {
+ client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+ }
+
+ private void doRefreshData() throws Exception
+ {
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
@@ -109,7 +120,7 @@ public class TreeCache implements Closeable
}
}
- private void wasReconnected() throws Exception
+ void wasReconnected() throws Exception
{
refresh();
ConcurrentMap<String, TreeNode> childMap = children.get();
@@ -122,12 +133,12 @@ public class TreeCache implements Closeable
}
}
- private void wasCreated() throws Exception
+ void wasCreated() throws Exception
{
refresh();
}
- private void wasDeleted() throws Exception
+ void wasDeleted() throws Exception
{
stat.set(null);
data.set(null);
@@ -200,6 +211,7 @@ public class TreeCache implements Closeable
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
+ Stat newStat = event.getStat();
switch ( event.getType() )
{
case EXISTS:
@@ -217,7 +229,12 @@ public class TreeCache implements Closeable
case CHILDREN:
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
- stat.set(event.getStat());
+ Stat oldStat = stat.get();
+ if (oldStat != null && oldStat.getMzxid() == newStat.getMzxid()) {
+ // Only update stat if mzxid is different, otherwise we might obscure
+ // GET_DATA event updates.
+ stat.set(newStat);
+ }
if ( event.getChildren().isEmpty() )
{
@@ -263,19 +280,22 @@ public class TreeCache implements Closeable
case GET_DATA:
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
- Stat oldStat = stat.getAndSet(event.getStat());
if ( cacheData )
{
data.set(event.getData());
}
+ Stat oldStat = stat.getAndSet(newStat);
if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) )
{
- publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), event.getStat(), event.getData()));
+ publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), newStat, event.getData()));
}
- else if ( oldStat.getMzxid() != event.getStat().getMzxid() )
+ else
{
- publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), event.getStat(), event.getData()));
+ if ( oldStat == null || oldStat.getMzxid() != newStat.getMzxid() )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), newStat, event.getData()));
+ }
}
}
else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )