You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2016/01/28 19:01:29 UTC

[1/2] curator git commit: CURATOR-294: Make ChildData immutable; PathChildrenCache uses a mutable subclass.

Repository: curator
Updated Branches:
  refs/heads/CURATOR-294 [created] 81067f5b3


CURATOR-294: Make ChildData immutable; PathChildrenCache uses a mutable subclass.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/cc27e343
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/cc27e343
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/cc27e343

Branch: refs/heads/CURATOR-294
Commit: cc27e3434de7dae58f9fb07a013137d0da3dc378
Parents: 649e0ba
Author: Scott Blum <dr...@apache.org>
Authored: Thu Jan 28 12:38:57 2016 -0500
Committer: Scott Blum <dr...@apache.org>
Committed: Thu Jan 28 12:38:57 2016 -0500

----------------------------------------------------------------------
 .../framework/recipes/cache/ChildData.java      | 38 ++++++-------
 .../recipes/cache/PathChildrenCache.java        | 58 +++++++++++++-------
 2 files changed, 58 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/cc27e343/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ChildData.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ChildData.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ChildData.java
index 21e0bc4..806eff6 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ChildData.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ChildData.java
@@ -25,15 +25,15 @@ import org.apache.curator.utils.PathUtils;
 
 public class ChildData implements Comparable<ChildData>
 {
-    private final String    path;
-    private final Stat      stat;
-    private final AtomicReference<byte[]>    data;
+    private final String path;
+    private final Stat stat;
+    private final byte[] data;
 
     public ChildData(String path, Stat stat, byte[] data)
     {
         this.path = PathUtils.validatePath(path);
         this.stat = stat;
-        this.data = new AtomicReference<byte[]>(data);
+        this.data = data;
     }
 
     /**
@@ -53,7 +53,7 @@ public class ChildData implements Comparable<ChildData>
             return -1;
         }
 
-        return path.compareTo(rhs.path);
+        return getPath().compareTo(rhs.getPath());
     }
 
     @SuppressWarnings("RedundantIfStatement")
@@ -69,17 +69,19 @@ public class ChildData implements Comparable<ChildData>
             return false;
         }
 
-        ChildData childData = (ChildData)o;
+        ChildData other = (ChildData)o;
 
-        if ( !Arrays.equals(data.get(), childData.data.get()) )
+        String path = getPath();
+        if ( path != null ? !path.equals(other.getPath()) : other.getPath() != null )
         {
             return false;
         }
-        if ( path != null ? !path.equals(childData.path) : childData.path != null )
+        Stat stat = getStat();
+        if ( stat != null ? !stat.equals(other.getStat()) : other.getStat() != null )
         {
             return false;
         }
-        if ( stat != null ? !stat.equals(childData.stat) : childData.stat != null )
+        if ( !Arrays.equals(getData(), other.getData()) )
         {
             return false;
         }
@@ -90,9 +92,12 @@ public class ChildData implements Comparable<ChildData>
     @Override
     public int hashCode()
     {
+        String path = getPath();
         int result = path != null ? path.hashCode() : 0;
+        Stat stat = getStat();
         result = 31 * result + (stat != null ? stat.hashCode() : 0);
-        result = 31 * result + (data != null ? Arrays.hashCode(data.get()) : 0);
+        byte[] data = getData();
+        result = 31 * result + (data != null ? Arrays.hashCode(data) : 0);
         return result;
     }
 
@@ -126,21 +131,16 @@ public class ChildData implements Comparable<ChildData>
      */
     public byte[] getData()
     {
-        return data.get();
-    }
-
-    void clearData()
-    {
-        data.set(null);
+        return data;
     }
 
     @Override
     public String toString()
     {
         return "ChildData{" +
-            "path='" + path + '\'' +
-            ", stat=" + stat +
-            ", data=" + Arrays.toString(data.get()) +
+            "path='" + getPath() + '\'' +
+            ", stat=" + getStat() +
+            ", data=" + Arrays.toString(getData()) +
             '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cc27e343/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 12769e1..1599294 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -74,8 +74,8 @@ public class PathChildrenCache implements Closeable
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
-    private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
-    private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
+    private final ConcurrentMap<String, NodeData> currentData = Maps.newConcurrentMap();
+    private final AtomicReference<Map<String, NodeData>> initialSet = new AtomicReference<Map<String, NodeData>>();
     private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final EnsureContainers ensureContainers;
@@ -87,7 +87,27 @@ public class PathChildrenCache implements Closeable
         CLOSED
     }
 
-    private static final ChildData NULL_CHILD_DATA = new ChildData("/", null, null);
+    static class NodeData extends ChildData {
+        private final AtomicReference<byte[]> dataRef;
+
+        public NodeData(String path, Stat stat, byte[] data) {
+            super(path, stat, null);
+            dataRef = new AtomicReference<byte[]>(data);
+        }
+
+        @Override
+        public byte[] getData()
+        {
+            return dataRef.get();
+        }
+
+        void clearData()
+        {
+            dataRef.set(null);
+        }
+    }
+
+    private static final NodeData NULL_CHILD_DATA = new NodeData("/", null, null);
 
     private static final boolean USE_EXISTS = Boolean.getBoolean("curator-path-children-cache-use-exists");
 
@@ -306,7 +326,7 @@ public class PathChildrenCache implements Closeable
 
             case POST_INITIALIZED_EVENT:
             {
-                initialSet.set(Maps.<String, ChildData>newConcurrentMap());
+                initialSet.set(Maps.<String, NodeData>newConcurrentMap());
                 offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
                 break;
             }
@@ -443,7 +463,7 @@ public class PathChildrenCache implements Closeable
      */
     public boolean clearDataBytes(String fullPath, int ifVersion)
     {
-        ChildData data = currentData.get(fullPath);
+        NodeData data = currentData.get(fullPath);
         if ( data != null )
         {
             if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) )
@@ -576,13 +596,13 @@ public class PathChildrenCache implements Closeable
     @VisibleForTesting
     protected void remove(String fullPath)
     {
-        ChildData data = currentData.remove(fullPath);
+        NodeData data = currentData.remove(fullPath);
         if ( data != null )
         {
             offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, data)));
         }
 
-        Map<String, ChildData> localInitialSet = initialSet.get();
+        Map<String, NodeData> localInitialSet = initialSet.get();
         if ( localInitialSet != null )
         {
             localInitialSet.remove(fullPath);
@@ -598,7 +618,7 @@ public class PathChildrenCache implements Closeable
             {
                 Stat stat = new Stat();
                 byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(fullPath);
-                currentData.put(fullPath, new ChildData(fullPath, stat, bytes));
+                currentData.put(fullPath, new NodeData(fullPath, stat, bytes));
             }
             catch ( KeeperException.NoNodeException ignore )
             {
@@ -611,7 +631,7 @@ public class PathChildrenCache implements Closeable
             Stat stat = client.checkExists().forPath(fullPath);
             if ( stat != null )
             {
-                currentData.put(fullPath, new ChildData(fullPath, stat, null));
+                currentData.put(fullPath, new NodeData(fullPath, stat, null));
             }
             else
             {
@@ -684,8 +704,8 @@ public class PathChildrenCache implements Closeable
     {
         if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event
         {
-            ChildData data = new ChildData(fullPath, stat, bytes);
-            ChildData previousData = currentData.put(fullPath, data);
+            NodeData data = new NodeData(fullPath, stat, bytes);
+            NodeData previousData = currentData.put(fullPath, data);
             if ( previousData == null ) // i.e. new
             {
                 offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
@@ -698,9 +718,9 @@ public class PathChildrenCache implements Closeable
         }
     }
 
-    private void updateInitialSet(String name, ChildData data)
+    private void updateInitialSet(String name, NodeData data)
     {
-        Map<String, ChildData> localInitialSet = initialSet.get();
+        Map<String, NodeData> localInitialSet = initialSet.get();
         if ( localInitialSet != null )
         {
             localInitialSet.put(name, data);
@@ -708,7 +728,7 @@ public class PathChildrenCache implements Closeable
         }
     }
 
-    private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet)
+    private void maybeOfferInitializedEvent(Map<String, NodeData> localInitialSet)
     {
         if ( !hasUninitialized(localInitialSet) )
         {
@@ -716,7 +736,7 @@ public class PathChildrenCache implements Closeable
 
             if ( initialSet.getAndSet(null) != null )   // avoid edge case - don't send more than 1 INITIALIZED event
             {
-                final List<ChildData> children = ImmutableList.copyOf(localInitialSet.values());
+                final List<ChildData> children = ImmutableList.<ChildData>copyOf(localInitialSet.values());
                 PathChildrenCacheEvent event = new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)
                 {
                     @Override
@@ -730,20 +750,20 @@ public class PathChildrenCache implements Closeable
         }
     }
 
-    private boolean hasUninitialized(Map<String, ChildData> localInitialSet)
+    private boolean hasUninitialized(Map<String, NodeData> localInitialSet)
     {
         if ( localInitialSet == null )
         {
             return false;
         }
 
-        Map<String, ChildData> uninitializedChildren = Maps.filterValues
+        Map<String, NodeData> uninitializedChildren = Maps.filterValues
             (
                 localInitialSet,
-                new Predicate<ChildData>()
+                new Predicate<NodeData>()
                 {
                     @Override
-                    public boolean apply(ChildData input)
+                    public boolean apply(NodeData input)
                     {
                         return (input == NULL_CHILD_DATA);  // check against ref intentional
                     }


[2/2] curator git commit: CURATOR-294: Optimize TreeCache, fix possible concurrency issue

Posted by dr...@apache.org.
CURATOR-294: Optimize TreeCache, fix possible concurrency issue


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/81067f5b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/81067f5b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/81067f5b

Branch: refs/heads/CURATOR-294
Commit: 81067f5b35937e4415b1d11e8d06b99a21f67926
Parents: cc27e34
Author: Scott Blum <dr...@apache.org>
Authored: Thu Jan 28 12:58:14 2016 -0500
Committer: Scott Blum <dr...@apache.org>
Committed: Thu Jan 28 12:58:14 2016 -0500

----------------------------------------------------------------------
 .../framework/recipes/cache/TreeCache.java      | 37 +++++++++++---------
 1 file changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/81067f5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 4d00266..3f7d8d4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -220,8 +220,7 @@ public class TreeCache implements Closeable
         final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
         final TreeNode parent;
         final String path;
-        final AtomicReference<Stat> stat = new AtomicReference<Stat>();
-        final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+        final AtomicReference<ChildData> childData = new AtomicReference<ChildData>();
         final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
         final int depth;
 
@@ -296,8 +295,7 @@ public class TreeCache implements Closeable
 
         void wasDeleted() throws Exception
         {
-            Stat oldStat = stat.getAndSet(null);
-            byte[] oldData = data.getAndSet(null);
+            ChildData oldChildData = childData.getAndSet(null);
             client.clearWatcherReferences(this);
             ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null);
             if ( childMap != null )
@@ -318,7 +316,7 @@ public class TreeCache implements Closeable
             NodeState oldState = nodeState.getAndSet(NodeState.DEAD);
             if ( oldState == NodeState.LIVE )
             {
-                publishEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData(path, oldStat, oldData));
+                publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData);
             }
 
             if ( parent == null )
@@ -383,12 +381,12 @@ public class TreeCache implements Closeable
             case CHILDREN:
                 if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
-                    Stat oldStat = stat.get();
-                    if ( oldStat != null && oldStat.getMzxid() == newStat.getMzxid() )
+                    ChildData oldChildData = childData.get();
+                    if ( oldChildData != null && oldChildData.getStat().getMzxid() == newStat.getMzxid() )
                     {
-                        // Only update stat if mzxid is different, otherwise we might obscure
+                        // Only update stat if mzxid is same, otherwise we might obscure
                         // GET_DATA event updates.
-                        stat.set(newStat);
+                        childData.compareAndSet(oldChildData, new ChildData(oldChildData.getPath(), newStat, oldChildData.getData()));
                     }
 
                     if ( event.getChildren().isEmpty() )
@@ -435,22 +433,27 @@ public class TreeCache implements Closeable
             case GET_DATA:
                 if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
+                    ChildData toPublish = new ChildData(event.getPath(), newStat, event.getData());
+                    ChildData oldChildData;
                     if ( cacheData )
                     {
-                        data.set(event.getData());
+                        oldChildData = childData.getAndSet(toPublish);
+                    }
+                    else
+                    {
+                        oldChildData = childData.getAndSet(new ChildData(event.getPath(), newStat, null));
                     }
 
-                    Stat oldStat = stat.getAndSet(newStat);
                     NodeState oldState = nodeState.getAndSet(NodeState.LIVE);
                     if ( oldState != NodeState.LIVE )
                     {
-                        publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), newStat, event.getData()));
+                        publishEvent(TreeCacheEvent.Type.NODE_ADDED, toPublish);
                     }
                     else
                     {
-                        if ( oldStat == null || oldStat.getMzxid() != newStat.getMzxid() )
+                        if ( oldChildData == null || oldChildData.getStat().getMzxid() != newStat.getMzxid() )
                         {
-                            publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), newStat, event.getData()));
+                            publishEvent(TreeCacheEvent.Type.NODE_UPDATED, toPublish);
                         }
                     }
                 }
@@ -681,9 +684,9 @@ public class TreeCache implements Closeable
             for ( Map.Entry<String, TreeNode> entry : map.entrySet() )
             {
                 TreeNode childNode = entry.getValue();
-                ChildData childData = new ChildData(childNode.path, childNode.stat.get(), childNode.data.get());
+                ChildData childData = childNode.childData.get();
                 // Double-check liveness after retreiving data.
-                if ( childNode.nodeState.get() == NodeState.LIVE )
+                if ( childData != null && childNode.nodeState.get() == NodeState.LIVE )
                 {
                     builder.put(entry.getKey(), childData);
                 }
@@ -710,7 +713,7 @@ public class TreeCache implements Closeable
         {
             return null;
         }
-        ChildData result = new ChildData(node.path, node.stat.get(), node.data.get());
+        ChildData result = node.childData.get();
         // Double-check liveness after retreiving data.
         return node.nodeState.get() == NodeState.LIVE ? result : null;
     }