You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/08/08 04:09:11 UTC

[4/9] git commit: Fix potential race condition.

Fix potential race condition.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/26749277
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/26749277
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/26749277

Branch: refs/heads/master
Commit: 267492779ac127d22c67791c799b73d46dfeac7a
Parents: f474333
Author: Scott Blum <sc...@squareup.com>
Authored: Fri Aug 1 15:23:16 2014 -0400
Committer: Scott Blum <sc...@squareup.com>
Committed: Fri Aug 1 15:28:15 2014 -0400

----------------------------------------------------------------------
 .../framework/recipes/cache/TreeCache.java      | 58 +++++++++++++-------
 1 file changed, 39 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/26749277/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f73861d..0d5995a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,14 +71,14 @@ public class TreeCache implements Closeable
 
     private final class TreeNode implements Watcher, BackgroundCallback
     {
-        private final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
-        private final String path;
-        private final TreeNode parent;
-        private final AtomicReference<Stat> stat = new AtomicReference<Stat>();
-        private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
-        private final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
-
-        private TreeNode(String path, TreeNode parent)
+        final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
+        final TreeNode parent;
+        final String path;
+        final AtomicReference<Stat> stat = new AtomicReference<Stat>();
+        final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+        final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
+
+        TreeNode(String path, TreeNode parent)
         {
             this.path = path;
             this.parent = parent;
@@ -86,19 +86,30 @@ public class TreeCache implements Closeable
 
         private void refresh() throws Exception
         {
-            refreshData();
-            refreshChildren();
+            outstandingOps.addAndGet(2);
+            doRefreshData();
+            doRefreshChildren();
         }
 
         private void refreshChildren() throws Exception
         {
             outstandingOps.incrementAndGet();
-            client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+            doRefreshChildren();
         }
 
         private void refreshData() throws Exception
         {
             outstandingOps.incrementAndGet();
+            doRefreshData();
+        }
+
+        private void doRefreshChildren() throws Exception
+        {
+            client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+        }
+
+        private void doRefreshData() throws Exception
+        {
             if ( dataIsCompressed )
             {
                 client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
@@ -109,7 +120,7 @@ public class TreeCache implements Closeable
             }
         }
 
-        private void wasReconnected() throws Exception
+        void wasReconnected() throws Exception
         {
             refresh();
             ConcurrentMap<String, TreeNode> childMap = children.get();
@@ -122,12 +133,12 @@ public class TreeCache implements Closeable
             }
         }
 
-        private void wasCreated() throws Exception
+        void wasCreated() throws Exception
         {
             refresh();
         }
 
-        private void wasDeleted() throws Exception
+        void wasDeleted() throws Exception
         {
             stat.set(null);
             data.set(null);
@@ -200,6 +211,7 @@ public class TreeCache implements Closeable
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
         {
+            Stat newStat = event.getStat();
             switch ( event.getType() )
             {
             case EXISTS:
@@ -217,7 +229,12 @@ public class TreeCache implements Closeable
             case CHILDREN:
                 if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
-                    stat.set(event.getStat());
+                    Stat oldStat = stat.get();
+                    if (oldStat != null && oldStat.getMzxid() == newStat.getMzxid()) {
+                        // Only update stat if mzxid is different, otherwise we might obscure
+                        // GET_DATA event updates.
+                        stat.set(newStat);
+                    }
 
                     if ( event.getChildren().isEmpty() )
                     {
@@ -263,19 +280,22 @@ public class TreeCache implements Closeable
             case GET_DATA:
                 if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
-                    Stat oldStat = stat.getAndSet(event.getStat());
                     if ( cacheData )
                     {
                         data.set(event.getData());
                     }
 
+                    Stat oldStat = stat.getAndSet(newStat);
                     if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) )
                     {
-                        publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), event.getStat(), event.getData()));
+                        publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), newStat, event.getData()));
                     }
-                    else if ( oldStat.getMzxid() != event.getStat().getMzxid() )
+                    else
                     {
-                        publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), event.getStat(), event.getData()));
+                        if ( oldStat == null || oldStat.getMzxid() != newStat.getMzxid() )
+                        {
+                            publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), newStat, event.getData()));
+                        }
                     }
                 }
                 else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )