You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2013/03/13 17:18:30 UTC

svn commit: r1456020 - /jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java

Author: chetanm
Date: Wed Mar 13 16:18:29 2013
New Revision: 1456020

URL: http://svn.apache.org/r1456020
Log:
OAK-619 - Lock-free MongoMK implementation (WIP)

Using Guava cache instead of custom cache implementation

Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1456020&r1=1456019&r2=1456020&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java Wed Mar 13 16:18:29 2013
@@ -22,15 +22,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mk.blobs.BlobStore;
@@ -110,14 +113,12 @@ public class MongoMK implements MicroKer
      * Key: path@rev
      * Value: node
      */
-    private final Map<String, Node> nodeCache = 
-            new Cache<String, Node>(CACHE_NODES);
-    
+    private final Cache<String, Node> nodeCache;
+
     /**
      * Child node cache.
      */
-    private Cache<String, Node.Children> nodeChildrenCache =
-            new Cache<String, Node.Children>(CACHE_CHILDREN);
+    private final Cache<String, Node.Children> nodeChildrenCache;
 
     /**
      * The unsaved write count increments.
@@ -125,12 +126,6 @@ public class MongoMK implements MicroKer
     private final Map<String, Long> writeCountIncrements = new HashMap<String, Long>();
     
     /**
-     * The set of known valid revision.
-     * The key is the revision id, the value is 1 (because a cache can't be a set).
-     */
-    private final Map<String, Long> revCache = new Cache<String, Long>(1024);
-    
-    /**
      * The last known head revision. This is the last-known revision.
      */
     private Revision headRevision;
@@ -173,6 +168,16 @@ public class MongoMK implements MicroKer
         this.store = store;
         this.blobStore = blobStore;
         this.clusterId = clusterId;
+
+        //TODO Use size based weigher
+        nodeCache = CacheBuilder.newBuilder()
+                        .maximumSize(CACHE_NODES)
+                        .build();
+
+        nodeChildrenCache =  CacheBuilder.newBuilder()
+                        .maximumSize(CACHE_CHILDREN)
+                        .build();
+
         backgroundThread = new Thread(
             new BackgroundOperation(this, isDisposed),
             "MongoMK background thread");
@@ -222,7 +227,7 @@ public class MongoMK implements MicroKer
     Node getNode(String path, Revision rev) {
         checkRevisionAge(rev, path);
         String key = path + "@" + rev;
-        Node node = nodeCache.get(key);
+        Node node = nodeCache.getIfPresent(key);
         if (node == null) {
             node = readNode(path, rev);
             if (node != null) {
@@ -258,32 +263,37 @@ public class MongoMK implements MicroKer
         return x.compareRevisionTime(previous) > 0;
     }
     
-    public Node.Children readChildren(String path, String nodeId, Revision rev, int limit) {
-        Node.Children c;
-        c = nodeChildrenCache.get(nodeId);
-        if (c != null) {
-            return c;
-        }
-        String from = PathUtils.concat(path, "a");
-        from = Utils.getIdFromPath(from);
-        from = from.substring(0, from.length() - 1);
-        String to = PathUtils.concat(path, "z");
-        to = Utils.getIdFromPath(to);
-        to = to.substring(0, to.length() - 2) + "0";
-        List<Map<String, Object>> list = store.query(DocumentStore.Collection.NODES, from, to, limit);
-        c = new Node.Children(path, nodeId, rev);
-        for (Map<String, Object> e : list) {
-            // filter out deleted children
-            if (getLiveRevision(e, rev) == null) {
-                continue;
-            }
-            // TODO put the whole node in the cache
-            String id = e.get(UpdateOp.ID).toString();
-            String p = Utils.getPathFromId(id);
-            c.children.add(p);
+    public Node.Children readChildren(final String path, final String nodeId,
+                                      final Revision rev, final int limit) {
+        try {
+            return nodeChildrenCache.get(nodeId, new Callable<Children>() {
+                @Override
+                public Children call() throws Exception {
+                    String from = PathUtils.concat(path, "a");
+                    from = Utils.getIdFromPath(from);
+                    from = from.substring(0, from.length() - 1);
+                    String to = PathUtils.concat(path, "z");
+                    to = Utils.getIdFromPath(to);
+                    to = to.substring(0, to.length() - 2) + "0";
+                    List<Map<String, Object>> list = store.query(DocumentStore.Collection.NODES, from, to, limit);
+                    Children c = new Children(path, nodeId, rev);
+                    for (Map<String, Object> e : list) {
+                        // filter out deleted children
+                        if (getLiveRevision(e, rev) == null) {
+                            continue;
+                        }
+                        // TODO put the whole node in the cache
+                        String id = e.get(UpdateOp.ID).toString();
+                        String p = Utils.getPathFromId(id);
+                        c.children.add(p);
+                    }
+                    return c;
+                }
+            });
+        } catch (ExecutionException e) {
+            throw new IllegalStateException("Error occurred while fetching node children for node "+nodeId,e);
         }
-        nodeChildrenCache.put(nodeId, c);
-        return c;
+
     }
 
     private Node readNode(String path, Revision rev) {
@@ -638,7 +648,7 @@ public class MongoMK implements MicroKer
             Node n = getNode(path, rev);
 
             // remove from the cache
-            nodeCache.remove(path + "@" + rev);
+            nodeCache.invalidate(path + "@" + rev);
             
             if (n != null) {
                 Node.Children c = readChildren(path, n.getId(), rev,
@@ -646,12 +656,12 @@ public class MongoMK implements MicroKer
                 for (String childPath : c.children) {
                     markAsDeleted(childPath, commit, true);
                 }
-                nodeChildrenCache.remove(n.getId());
+                nodeChildrenCache.invalidate(n.getId());
             }
         }
 
         // Remove the node from the cache
-        nodeCache.remove(path + "@" + rev);
+        nodeCache.invalidate(path + "@" + rev);
     }
     
     /**
@@ -797,54 +807,6 @@ public class MongoMK implements MicroKer
     }
     
     /**
-     * A simple cache.
-     *
-     * @param <K> the key type
-     * @param <V> the value type
-     */
-    static class Cache<K, V> extends LinkedHashMap<K, V> {
-
-        private static final long serialVersionUID = 1L;
-        private int size;
-        private TreeSet<K> keySet = new TreeSet<K>();
-
-        Cache(int size) {
-            super(size, (float) 0.75, true);
-            this.size = size;
-        }
-        
-        public synchronized V put(K key, V value) {
-            keySet.add(key);
-            return super.put(key, value);
-        }
-        
-        public synchronized V remove(Object key) {
-            keySet.remove(key);
-            return super.remove(key);
-        }
-
-        protected synchronized boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-            boolean remove = size() > size;
-            if (remove) {
-                Object k = eldest.getKey();
-                if (k != null) {
-                    keySet.remove(k);
-                }
-            }
-            return remove;
-        }
-        
-        public String toString() {
-            return super.toString().replace(',', '\n');
-        }
-        
-        // public synchronized SortedSet<K> subSet(K fromElement, K toElement) {
-        //   return keySet.subSet(fromElement, toElement);
-        // }
-
-    }
-
-    /**
      * A background thread.
      */
     static class BackgroundOperation implements Runnable {
@@ -885,7 +847,7 @@ public class MongoMK implements MicroKer
             writeCountIncrements.remove(path);
         }
         long newWriteCount = oldWriteCount + writeCountInc;
-        Children c = nodeChildrenCache.get(path + "@" + (newWriteCount - 1));
+        Children c = nodeChildrenCache.getIfPresent(path + "@" + (newWriteCount - 1));
         if (isNew || (!isDelete && c != null)) {
             String id = path + "@" + newWriteCount;
             Children c2 = new Children(path, id, rev);
@@ -896,7 +858,7 @@ public class MongoMK implements MicroKer
             set.removeAll(removed);
             set.addAll(added);
             c2.children.addAll(set);
-            if (nodeChildrenCache.get(id) != null) {
+            if (nodeChildrenCache.getIfPresent(id) != null) {
                 // TODO should not happend, 
                 // probably a problem with add/delete/add
                 MicroKernelException e = new MicroKernelException(