You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2016/03/23 00:36:43 UTC

svn commit: r1736260 - in /zookeeper/branches/branch-3.5: ./ src/java/main/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/server/

Author: camille
Date: Tue Mar 22 23:36:42 2016
New Revision: 1736260

URL: http://svn.apache.org/viewvc?rev=1736260&view=rev
Log:
ZOOKEEPER-2141. ACL cache in DataTree never removes entries
(Adam Milne-Smith via camille)

Added:
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ReferenceCountedACLCache.java
    zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/server/ReferenceCountedACLCacheTest.java
Modified:
    zookeeper/branches/branch-3.5/CHANGES.txt
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/DataTree.java
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ZKDatabase.java

Modified: zookeeper/branches/branch-3.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/CHANGES.txt?rev=1736260&r1=1736259&r2=1736260&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.5/CHANGES.txt Tue Mar 22 23:36:42 2016
@@ -144,6 +144,8 @@ BUGFIXES:
 
   ZOOKEEPER-2364: "ant docs" fails on branch-3.5 due to missing releasenotes.xml.
   (phunt via cnauroth)
+  
+  ZOOKEEPER-2141 ACL cache in DataTree never removes entries (Adam Milne-Smith via camille)
 
 IMPROVEMENTS:
 

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1736260&r1=1736259&r2=1736260&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/DataTree.java Tue Mar 22 23:36:42 2016
@@ -18,7 +18,6 @@
 
 package org.apache.zookeeper.server;
 
-import org.apache.jute.Index;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
@@ -35,7 +34,6 @@ import org.apache.zookeeper.Watcher.Even
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.PathTrie;
 import org.apache.zookeeper.data.ACL;
@@ -139,23 +137,7 @@ public class DataTree {
     private final Set<String> containers =
             Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
-    /**
-     * this is map from longs to acl's. It saves acl's being stored for each
-     * datanode.
-     */
-    private final Map<Long, List<ACL>> longKeyMap =
-        new HashMap<Long, List<ACL>>();
-
-    /**
-     * this a map from acls to long.
-     */
-    private final Map<List<ACL>, Long> aclKeyMap =
-        new HashMap<List<ACL>, Long>();
-
-    /**
-     * these are the number of acls that we have in the datatree
-     */
-    private long aclIndex = 0;
+    private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();
 
     @SuppressWarnings("unchecked")
     public Set<String> getEphemerals(long sessionId) {
@@ -174,54 +156,6 @@ public class DataTree {
         return new HashSet<String>(containers);
     }
 
-    int getAclSize() {
-        return longKeyMap.size();
-    }
-
-    private long incrementIndex() {
-        return ++aclIndex;
-    }
-
-    /**
-     * converts the list of acls to a list of longs.
-     *
-     * @param acls
-     * @return a list of longs that map to the acls
-     */
-    public synchronized Long convertAcls(List<ACL> acls) {
-        if (acls == null)
-            return -1L;
-        // get the value from the map
-        Long ret = aclKeyMap.get(acls);
-        // could not find the map
-        if (ret != null)
-            return ret;
-        long val = incrementIndex();
-        longKeyMap.put(val, acls);
-        aclKeyMap.put(acls, val);
-        return val;
-    }
-
-    /**
-     * converts a list of longs to a list of acls.
-     *
-     * @param longVal
-     *            the list of longs
-     * @return a list of ACLs that map to longs
-     */
-    public synchronized List<ACL> convertLong(Long longVal) {
-        if (longVal == null)
-            return null;
-        if (longVal == -1L)
-            return Ids.OPEN_ACL_UNSAFE;
-        List<ACL> acls = longKeyMap.get(longVal);
-        if (acls == null) {
-            LOG.error("ERROR: ACL not available for long " + longVal);
-            throw new RuntimeException("Failed to fetch acls for " + longVal);
-        }
-        return acls;
-    }
-
     public Collection<Long> getSessions() {
         return ephemerals.keySet();
     }
@@ -517,7 +451,7 @@ public class DataTree {
             }
             parent.stat.setCversion(parentCVersion);
             parent.stat.setPzxid(zxid);
-            Long longval = convertAcls(acl);
+            Long longval = aclCache.convertAcls(acl);
             DataNode child = new DataNode(data, longval, stat);
             parent.addChild(childName);
             nodes.put(path, child);
@@ -581,6 +515,9 @@ public class DataTree {
             throw new KeeperException.NoNodeException();
         }
         nodes.remove(path);
+        synchronized (node) {
+            aclCache.removeUsage(node.acl);
+        }
         DataNode parent = nodes.get(parentName);
         if (parent == null) {
             throw new KeeperException.NoNodeException();
@@ -740,8 +677,9 @@ public class DataTree {
             throw new KeeperException.NoNodeException();
         }
         synchronized (n) {
+            aclCache.removeUsage(n.acl);
             n.stat.setAversion(version);
-            n.acl = convertAcls(acl);
+            n.acl = aclCache.convertAcls(acl);
             n.copyStat(stat);
             return stat;
         }
@@ -755,10 +693,20 @@ public class DataTree {
         }
         synchronized (n) {
             n.copyStat(stat);
-            return new ArrayList<ACL>(convertLong(n.acl));
+            return new ArrayList<ACL>(aclCache.convertLong(n.acl));
         }
     }
 
+    public List<ACL> getACL(DataNode node) {
+        synchronized (node) {
+            return aclCache.convertLong(node.acl);
+        }
+    }
+
+    public int aclCacheSize() {
+        return aclCache.size();
+    }
+
     static public class ProcessTxnResult {
         public long clientId;
 
@@ -1202,45 +1150,8 @@ public class DataTree {
         }
     }
 
-    private void deserializeList(Map<Long, List<ACL>> longKeyMap,
-            InputArchive ia) throws IOException {
-        int i = ia.readInt("map");
-        while (i > 0) {
-            Long val = ia.readLong("long");
-            if (aclIndex < val) {
-                aclIndex = val;
-            }
-            List<ACL> aclList = new ArrayList<ACL>();
-            Index j = ia.startVector("acls");
-            while (!j.done()) {
-                ACL acl = new ACL();
-                acl.deserialize(ia, "acl");
-                aclList.add(acl);
-                j.incr();
-            }
-            longKeyMap.put(val, aclList);
-            aclKeyMap.put(aclList, val);
-            i--;
-        }
-    }
-
-    private synchronized void serializeList(Map<Long, List<ACL>> longKeyMap,
-            OutputArchive oa) throws IOException {
-        oa.writeInt(longKeyMap.size(), "map");
-        Set<Map.Entry<Long, List<ACL>>> set = longKeyMap.entrySet();
-        for (Map.Entry<Long, List<ACL>> val : set) {
-            oa.writeLong(val.getKey(), "long");
-            List<ACL> aclList = val.getValue();
-            oa.startVector(aclList, "acls");
-            for (ACL acl : aclList) {
-                acl.serialize(oa, "acl");
-            }
-            oa.endVector(aclList, "acls");
-        }
-    }
-
     public void serialize(OutputArchive oa, String tag) throws IOException {
-        serializeList(longKeyMap, oa);
+        aclCache.serialize(oa);
         serializeNode(oa, new StringBuilder(""));
         // / marks end of stream
         // we need to check if clear had been called in between the snapshot.
@@ -1250,7 +1161,7 @@ public class DataTree {
     }
 
     public void deserialize(InputArchive ia, String tag) throws IOException {
-        deserializeList(longKeyMap, ia);
+        aclCache.deserialize(ia);
         nodes.clear();
         pTrie.clear();
         String path = ia.readString("path");
@@ -1258,6 +1169,9 @@ public class DataTree {
             DataNode node = new DataNode();
             ia.readRecord(node, "node");
             nodes.put(path, node);
+            synchronized (node) {
+                aclCache.addUsage(node.acl);
+            }
             int lastSlash = path.lastIndexOf('/');
             if (lastSlash == -1) {
                 root = node;
@@ -1289,6 +1203,8 @@ public class DataTree {
         // update the quotas - create path trie
         // and also update the stat nodes
         setupQuota();
+
+        aclCache.purgeUnused();
     }
 
     /**

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1736260&r1=1736259&r2=1736260&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Tue Mar 22 23:36:42 2016
@@ -261,9 +261,9 @@ public class FinalRequestProcessor imple
                 rsp = new SetDataResponse(rc.stat);
                 err = Code.get(rc.err);
                 break;
-            }           
+            }
             case OpCode.reconfig: {
-                lastOp = "RECO";               
+                lastOp = "RECO";
                 rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);
                 err = Code.get(rc.err);
                 break;
@@ -317,11 +317,7 @@ public class FinalRequestProcessor imple
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
-                Long aclL;
-                synchronized(n) {
-                    aclL = n.acl;
-                }
-                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
                         request.authInfo);
                 Stat stat = new Stat();
@@ -363,12 +359,7 @@ public class FinalRequestProcessor imple
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
-                Long aclG;
-                synchronized(n) {
-                    aclG = n.acl;
-
-                }
-                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
                         request.authInfo);
                 List<String> children = zks.getZKDatabase().getChildren(
@@ -387,11 +378,7 @@ public class FinalRequestProcessor imple
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
-                Long aclG;
-                synchronized(n) {
-                    aclG = n.acl;
-                }
-                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
                         request.authInfo);
                 List<String> children = zks.getZKDatabase().getChildren(

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1736260&r1=1736259&r2=1736260&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Tue Mar 22 23:36:42 2016
@@ -161,15 +161,13 @@ public class PrepRequestProcessor extend
             if (lastChange == null) {
                 DataNode n = zks.getZKDatabase().getNode(path);
                 if (n != null) {
-                    Long acl;
                     Set<String> children;
                     synchronized(n) {
-                        acl = n.acl;
                         children = n.getChildren();
                     }
                     lastChange = new ChangeRecord(-1, path, n.stat,
                         children != null ? children.size() : 0,
-                            zks.getZKDatabase().convertLong(acl));
+                            zks.getZKDatabase().aclForNode(n));
                 }
             }
         }

Added: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ReferenceCountedACLCache.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ReferenceCountedACLCache.java?rev=1736260&view=auto
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ReferenceCountedACLCache.java (added)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ReferenceCountedACLCache.java Tue Mar 22 23:36:42 2016
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.Index;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ReferenceCountedACLCache {
+    private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountedACLCache.class);
+
+    /**
+     * this is map from longs to acl's. It saves acl's being stored for each
+     * datanode.
+     */
+    private final Map<Long, List<ACL>> longKeyMap =
+            new HashMap<Long, List<ACL>>();
+
+    /**
+     * this a map from acls to long.
+     */
+    private final Map<List<ACL>, Long> aclKeyMap =
+            new HashMap<List<ACL>, Long>();
+
+    private final Map<Long, AtomicLongWithEquals> referenceCounter =
+            new HashMap<Long, AtomicLongWithEquals>();
+    private final long OPEN_UNSAFE_ACL_ID = -1L;
+
+    /**
+     * these are the number of acls that we have in the datatree
+     */
+    private long aclIndex = 0;
+
+    /**
+     * converts the list of acls to a list of longs.
+     * Increments the reference counter for this ACL.
+     * @param acls
+     * @return a list of longs that map to the acls
+     */
+    public synchronized Long convertAcls(List<ACL> acls) {
+        if (acls == null)
+            return OPEN_UNSAFE_ACL_ID;
+
+        // get the value from the map
+        Long ret = aclKeyMap.get(acls);
+        if (ret == null) {
+            ret = incrementIndex();
+            longKeyMap.put(ret, acls);
+            aclKeyMap.put(acls, ret);
+        }
+
+        addUsage(ret);
+
+        return ret;
+    }
+
+    /**
+     * converts a list of longs to a list of acls.
+     *
+     * @param longVal
+     *            the list of longs
+     * @return a list of ACLs that map to longs
+     */
+    public synchronized List<ACL> convertLong(Long longVal) {
+        if (longVal == null)
+            return null;
+        if (longVal == OPEN_UNSAFE_ACL_ID)
+            return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+        List<ACL> acls = longKeyMap.get(longVal);
+        if (acls == null) {
+            LOG.error("ERROR: ACL not available for long " + longVal);
+            throw new RuntimeException("Failed to fetch acls for " + longVal);
+        }
+        return acls;
+    }
+
+    private long incrementIndex() {
+        return ++aclIndex;
+    }
+
+    public synchronized void deserialize(InputArchive ia) throws IOException {
+        clear();
+        int i = ia.readInt("map");
+        while (i > 0) {
+            Long val = ia.readLong("long");
+            if (aclIndex < val) {
+                aclIndex = val;
+            }
+            List<ACL> aclList = new ArrayList<ACL>();
+            Index j = ia.startVector("acls");
+            while (!j.done()) {
+                ACL acl = new ACL();
+                acl.deserialize(ia, "acl");
+                aclList.add(acl);
+                j.incr();
+            }
+            longKeyMap.put(val, aclList);
+            aclKeyMap.put(aclList, val);
+            referenceCounter.put(val, new AtomicLongWithEquals(0));
+            i--;
+        }
+    }
+
+    public synchronized void serialize(OutputArchive oa) throws IOException {
+        oa.writeInt(longKeyMap.size(), "map");
+        Set<Map.Entry<Long, List<ACL>>> set = longKeyMap.entrySet();
+        for (Map.Entry<Long, List<ACL>> val : set) {
+            oa.writeLong(val.getKey(), "long");
+            List<ACL> aclList = val.getValue();
+            oa.startVector(aclList, "acls");
+            for (ACL acl : aclList) {
+                acl.serialize(oa, "acl");
+            }
+            oa.endVector(aclList, "acls");
+        }
+    }
+
+    public int size() {
+        return aclKeyMap.size();
+    }
+
+    private void clear() {
+        aclKeyMap.clear();
+        longKeyMap.clear();
+        referenceCounter.clear();
+    }
+
+    public synchronized void addUsage(Long acl) {
+        if (acl == OPEN_UNSAFE_ACL_ID) {
+            return;
+        }
+
+        if (!longKeyMap.containsKey(acl)) {
+            LOG.info("Ignoring acl " + acl + " as it does not exist in the cache");
+            return;
+        }
+
+        AtomicLong count = referenceCounter.get(acl);
+        if (count == null) {
+            referenceCounter.put(acl, new AtomicLongWithEquals(1));
+        } else {
+            count.incrementAndGet();
+        }
+    }
+
+    public synchronized void removeUsage(Long acl) {
+        if (acl == OPEN_UNSAFE_ACL_ID) {
+            return;
+        }
+
+        if (!longKeyMap.containsKey(acl)) {
+            LOG.info("Ignoring acl " + acl + " as it does not exist in the cache");
+            return;
+        }
+
+        long newCount = referenceCounter.get(acl).decrementAndGet();
+        if (newCount <= 0) {
+            referenceCounter.remove(acl);
+            aclKeyMap.remove(longKeyMap.get(acl));
+            longKeyMap.remove(acl);
+        }
+    }
+
+    public synchronized void purgeUnused() {
+        Iterator<Map.Entry<Long, AtomicLongWithEquals>> refCountIter = referenceCounter.entrySet().iterator();
+        while (refCountIter.hasNext()) {
+            Map.Entry<Long, AtomicLongWithEquals> entry = refCountIter.next();
+            if (entry.getValue().get() <= 0) {
+                Long acl = entry.getKey();
+                aclKeyMap.remove(longKeyMap.get(acl));
+                longKeyMap.remove(acl);
+                refCountIter.remove();
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ReferenceCountedACLCache that = (ReferenceCountedACLCache) o;
+        synchronized (that) {
+            if (aclIndex != that.aclIndex) return false;
+        }
+        if (aclKeyMap != null ? !aclKeyMap.equals(that.aclKeyMap) : that.aclKeyMap != null) return false;
+        if (longKeyMap != null ? !longKeyMap.equals(that.longKeyMap) : that.longKeyMap != null) return false;
+        if (referenceCounter != null ? !referenceCounter.equals(that.referenceCounter) : that.referenceCounter != null)
+            return false;
+        return true;
+    }
+
+    @Override
+    public synchronized int hashCode() {
+        int result = longKeyMap != null ? longKeyMap.hashCode() : 0;
+        result = 31 * result + (aclKeyMap != null ? aclKeyMap.hashCode() : 0);
+        result = 31 * result + (referenceCounter != null ? referenceCounter.hashCode() : 0);
+        result = 31 * result + (int) (aclIndex ^ (aclIndex >>> 32));
+        return result;
+    }
+
+    /*
+    For reasons we don't all agree with, AtomicLong does not have an equals.
+     */
+    private static class AtomicLongWithEquals extends AtomicLong {
+
+        private static final long serialVersionUID = 3355155896813725462L;
+
+        public AtomicLongWithEquals(long i) {
+            super(i);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            return equals((AtomicLongWithEquals) o);
+        }
+
+        public boolean equals(AtomicLongWithEquals that) {
+            return get() == that.get();
+        }
+
+        @Override
+        public int hashCode() {
+            return 31 * Long.valueOf(get()).hashCode();
+        }
+    }
+}

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1736260&r1=1736259&r2=1736260&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Tue Mar 22 23:36:42 2016
@@ -335,7 +335,10 @@ public class ZKDatabase {
         }
         return new TxnLogProposalIterator(itr);
     }
-    
+
+    public List<ACL> aclForNode(DataNode n) {
+        return dataTree.getACL(n);
+    }
     /**
      * remove a cnxn from the datatree
      * @param cnxn the cnxn to remove from the datatree
@@ -422,15 +425,6 @@ public class ZKDatabase {
     }
 
     /**
-     * convert from long to the acl entry
-     * @param aclL the long for which to get the acl
-     * @return the acl corresponding to this long entry
-     */
-    public List<ACL> convertLong(Long aclL) {
-        return dataTree.convertLong(aclL);
-    }
-
-    /**
      * get data and stat for a path
      * @param path the path being queried
      * @param stat the stat for this path
@@ -494,7 +488,7 @@ public class ZKDatabase {
      * @return the acl size of the datatree
      */
     public int getAclSize() {
-        return dataTree.getAclSize();
+        return dataTree.aclCacheSize();
     }
 
     /**

Added: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/server/ReferenceCountedACLCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/server/ReferenceCountedACLCacheTest.java?rev=1736260&view=auto
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/server/ReferenceCountedACLCacheTest.java (added)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/server/ReferenceCountedACLCacheTest.java Tue Mar 22 23:36:42 2016
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class ReferenceCountedACLCacheTest {
+    @Test
+    public void testSameACLGivesSameID() {
+        List<ACL> testACL = createACL("myid");
+
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        Long aclId = cache.convertAcls(testACL);
+
+        List<ACL> testACL2 = createACL("myid");
+
+        assertEquals(aclId, cache.convertAcls(testACL2));
+    }
+
+    @Test
+    public void testWhetherOrderingMatters() {
+        List<ACL> testACL = new ArrayList<ACL>();
+        testACL.add(new ACL(ZooDefs.Perms.READ, new Id("scheme", "ro")));
+        testACL.add(new ACL(ZooDefs.Perms.WRITE, new Id("scheme", "rw")));
+
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        Long aclId = cache.convertAcls(testACL);
+
+        List<ACL> testACL2 = new ArrayList<ACL>();
+        testACL2.add(new ACL(ZooDefs.Perms.WRITE, new Id("scheme", "rw")));
+        testACL2.add(new ACL(ZooDefs.Perms.READ, new Id("scheme", "ro")));
+
+        assertFalse(aclId.equals(cache.convertAcls(testACL2)));
+    }
+
+    @Test
+    public void testBidirectionality() {
+        List<ACL> testACL = createACL("myid");
+
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        Long aclId = cache.convertAcls(testACL);
+
+        assertEquals(testACL, cache.convertLong(aclId));
+    }
+
+    @Test
+    public void testCacheSize() {
+        List<ACL> testACL = createACL("myid");
+
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        Long aclId = cache.convertAcls(testACL);
+        assertEquals(1, cache.size());
+
+        List<ACL> testACL2 = createACL("myid");
+
+        assertEquals(aclId, cache.convertAcls(testACL2));
+        assertEquals(1, cache.size());
+
+        List<ACL> testACL3 = createACL("differentId");
+
+        Long aclId3 = cache.convertAcls(testACL3);
+        assertFalse(aclId3.equals(aclId));
+        assertEquals(2, cache.size());
+    }
+
+    @Test
+    public void testAddThenRemove() {
+        List<ACL> testACL = createACL("myid");
+
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        Long aclId = cache.convertAcls(testACL);
+        assertEquals(1, cache.size());
+
+        cache.removeUsage(aclId);
+        assertEquals(0, cache.size());
+    }
+
+    @Test
+    public void testMultipleAddsAndRemove() {
+        List<ACL> testACL = createACL("myid");
+
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        Long aclId = cache.convertAcls(testACL);
+        assertEquals(1, cache.size());
+
+        cache.convertAcls(testACL);
+        assertEquals(1, cache.size());
+
+        List<ACL> testACL2 = createACL("anotherId");
+        cache.convertAcls(testACL2);
+
+        cache.removeUsage(aclId);
+        assertEquals(2, cache.size());
+        cache.removeUsage(aclId);
+        assertEquals(1, cache.size());
+
+        Long newId = cache.convertAcls(testACL);
+        assertFalse(aclId.equals(newId));
+    }
+
+    @Test
+    public void testAddUsage() {
+        List<ACL> testACL = createACL("myid");
+
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        Long aclId = cache.convertAcls(testACL);
+        assertEquals(1, cache.size());
+
+        cache.addUsage(aclId);
+        assertEquals(1, cache.size());
+
+        cache.removeUsage(aclId);
+        assertEquals(1, cache.size());
+        cache.removeUsage(aclId);
+        assertEquals(0, cache.size());
+    }
+
+    @Test
+    public void testAddNonExistentUsage() {
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+        cache.addUsage(1L);
+
+        assertEquals(0, cache.size());
+        /*
+        On startup, it's possible that we'll try calling addUsage of an ID not in the cache.  This is safe to ignore
+        as it'll be added later when we traverse the tranlog.  See discussion here:
+        http://mail-archives.apache.org/mod_mbox/zookeeper-user/201507.mbox/%3CCAB5oV2_ujhvBA1sEkCG2WRakPjCy%2BNR10620WK2G1GGgmEO44g%40mail.gmail.com%3E
+
+        This test makes sure that we don't add the ID to the cache in this case as that would result in dupes later
+        and consequently incorrect counts and entries that will never be cleaned out.
+         */
+    }
+
+    @Test
+    public void testSerializeDeserialize() throws IOException {
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+
+        List<ACL> acl1 = createACL("one");
+        List<ACL> acl2 = createACL("two");
+        List<ACL> acl3 = createACL("three");
+        List<ACL> acl4 = createACL("four");
+        List<ACL> acl5 = createACL("five");
+
+        Long aclId1 = convertACLsNTimes(cache, acl1, 1);
+        Long aclId2 = convertACLsNTimes(cache, acl2, 2);
+        Long aclId3 = convertACLsNTimes(cache, acl3, 3);
+        Long aclId4 = convertACLsNTimes(cache, acl4, 4);
+        Long aclId5 = convertACLsNTimes(cache, acl5, 5);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive archive = BinaryOutputArchive.getArchive(baos);
+        cache.serialize(archive);
+
+        BinaryInputArchive inArchive = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray()));
+        ReferenceCountedACLCache deserializedCache = new ReferenceCountedACLCache();
+        deserializedCache.deserialize(inArchive);
+        callAddUsageNTimes(deserializedCache, aclId1, 1);
+        callAddUsageNTimes(deserializedCache, aclId2, 2);
+        callAddUsageNTimes(deserializedCache, aclId3, 3);
+        callAddUsageNTimes(deserializedCache, aclId4, 4);
+        callAddUsageNTimes(deserializedCache, aclId5, 5);
+
+        assertEquals(cache, deserializedCache);
+    }
+
+    @Test
+    public void testPurgeUnused() throws IOException {
+        ReferenceCountedACLCache cache = new ReferenceCountedACLCache();
+
+        List<ACL> acl1 = createACL("one");
+        List<ACL> acl2 = createACL("two");
+        List<ACL> acl3 = createACL("three");
+        List<ACL> acl4 = createACL("four");
+        List<ACL> acl5 = createACL("five");
+
+        Long aclId1 = convertACLsNTimes(cache, acl1, 1);
+        Long aclId2 = convertACLsNTimes(cache, acl2, 2);
+        Long aclId3 = convertACLsNTimes(cache, acl3, 3);
+        Long aclId4 = convertACLsNTimes(cache, acl4, 4);
+        Long aclId5 = convertACLsNTimes(cache, acl5, 5);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive archive = BinaryOutputArchive.getArchive(baos);
+        cache.serialize(archive);
+
+        BinaryInputArchive inArchive = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray()));
+        ReferenceCountedACLCache deserializedCache = new ReferenceCountedACLCache();
+        deserializedCache.deserialize(inArchive);
+        callAddUsageNTimes(deserializedCache, aclId1, 1);
+        callAddUsageNTimes(deserializedCache, aclId2, 2);
+        deserializedCache.purgeUnused();
+
+        assertEquals(2, deserializedCache.size());
+        assertEquals(aclId1, deserializedCache.convertAcls(acl1));
+        assertEquals(aclId2, deserializedCache.convertAcls(acl2));
+        assertFalse(acl3.equals(deserializedCache.convertAcls(acl3)));
+        assertFalse(acl4.equals(deserializedCache.convertAcls(acl4)));
+        assertFalse(acl5.equals(deserializedCache.convertAcls(acl5)));
+    }
+
+    private void callAddUsageNTimes(ReferenceCountedACLCache deserializedCache, Long aclId, int num) {
+        for (int i = 0; i < num; i++) {
+            deserializedCache.addUsage(aclId);
+        }
+    }
+
+    private Long convertACLsNTimes(ReferenceCountedACLCache cache, List<ACL> acl, int num) {
+        if (num <= 0) {
+            return -1L;
+        }
+
+        for (int i = 0; i < num -1; i++) {
+            cache.convertAcls(acl);
+        }
+
+        return cache.convertAcls(acl);
+    }
+
+    private List<ACL> createACL(String id) {
+        List<ACL> acl1 = new ArrayList<ACL>();
+        acl1.add(new ACL(ZooDefs.Perms.ADMIN, new Id("scheme", id)));
+        return acl1;
+    }
+}
\ No newline at end of file