You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/09/17 16:36:18 UTC

svn commit: r1524070 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk: MongoMK.java MongoMicroKernelService.java NodeDocument.java UnsavedModifications.java

Author: mreutegg
Date: Tue Sep 17 14:36:17 2013
New Revision: 1524070

URL: http://svn.apache.org/r1524070
Log:
OAK-926: MongoMK: split documents when they are too large
- Introduce children cache on document level

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Tue Sep 17 14:36:17 2013
@@ -59,9 +59,11 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.Weigher;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.mongodb.DB;
 
@@ -73,6 +75,8 @@ import static com.google.common.base.Pre
  */
 public class MongoMK implements MicroKernel, RevisionContext {
 
+    private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
+
     /**
      * The threshold where special handling for many child node starts.
      */
@@ -85,7 +89,11 @@ public class MongoMK implements MicroKer
     static final boolean LIRS_CACHE = Boolean.parseBoolean(
             System.getProperty("oak.mongoMK.lirsCache", "false"));
 
-    private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
+    /**
+     * Do not cache more than this number of children for a document.
+     */
+    private static final int NUM_CHILDREN_CACHE_LIMIT = Integer.getInteger(
+            "oak.mongoMK.childrenCacheLimit", 1000);
 
     /**
      * When trying to access revisions that are older than this many
@@ -174,6 +182,12 @@ public class MongoMK implements MicroKer
     private final CacheStats diffCacheStats;
 
     /**
+     * Child doc cache.
+     */
+    private final Cache<String, NodeDocument.Children> docChildrenCache;
+    private final CacheStats docChildrenCacheStats;
+
+    /**
      * The unsaved last revisions. This contains the parents of all changed
      * nodes, once those nodes are committed but the parent node itself wasn't
      * committed yet. The parents are not immediately persisted as this would
@@ -269,6 +283,10 @@ public class MongoMK implements MicroKer
         diffCacheStats = new CacheStats(diffCache, "MongoMk-DiffCache",
                 builder.getWeigher(), builder.getDiffCacheSize());
 
+        docChildrenCache = builder.buildCache(builder.getDocChildrenCacheSize());
+        docChildrenCacheStats = new CacheStats(docChildrenCache, "MongoMk-DocChildren",
+                builder.getWeigher(), builder.getDocChildrenCacheSize());
+
         init();
         // initial reading of the revisions of other cluster nodes
         backgroundRead();
@@ -370,6 +388,8 @@ public class MongoMK implements MicroKer
             // TODO invalidating the whole cache is not really needed,
             // instead only those children that are cached could be checked
             store.invalidateCache();
+            // TODO only invalidate affected items
+            docChildrenCache.invalidateAll();
             // add a new revision, so that changes are visible
             Revision r = Revision.newRevision(clusterId);
             // the latest revisions of the current cluster node
@@ -549,18 +569,17 @@ public class MongoMK implements MicroKer
         // TODO use offset, to avoid O(n^2) and running out of memory
         // to do that, use the *name* of the last entry of the previous batch of children
         // as the starting point
-        String from = Utils.getKeyLowerLimit(path);
-        String to = Utils.getKeyUpperLimit(path);
-        List<NodeDocument> list;
+        Iterable<NodeDocument> docs;
         Children c = new Children();
         int rawLimit = limit;
         do {
             c.children.clear();
             c.hasMore = true;
-            list = store.query(Collection.NODES,
-                    from, to, rawLimit);
+            docs = readChildren(path, rawLimit);
             Set<Revision> validRevisions = new HashSet<Revision>();
-            for (NodeDocument doc : list) {
+            int numReturned = 0;
+            for (NodeDocument doc : docs) {
+                numReturned++;
                 // filter out deleted children
                 if (doc.isDeleted(this, rev, validRevisions)) {
                     continue;
@@ -571,7 +590,7 @@ public class MongoMK implements MicroKer
                     c.children.add(p);
                 }
             }
-            if (list.size() < rawLimit) {
+            if (numReturned < rawLimit) {
                 // fewer documents returned than requested
                 // -> no more documents
                 c.hasMore = false;
@@ -582,6 +601,35 @@ public class MongoMK implements MicroKer
         return c;
     }
 
+    @Nonnull
+    Iterable<NodeDocument> readChildren(final String path, int limit) {
+        String from = Utils.getKeyLowerLimit(path);
+        String to = Utils.getKeyUpperLimit(path);
+        if (limit > NUM_CHILDREN_CACHE_LIMIT) {
+            // do not use cache
+            return store.query(Collection.NODES, from, to, limit);
+        }
+        // check cache
+        NodeDocument.Children c = docChildrenCache.getIfPresent(path);
+        if (c == null || (c.childNames.size() < limit && !c.isComplete)) {
+            c = new NodeDocument.Children();
+            List<NodeDocument> docs = store.query(Collection.NODES, from, to, limit);
+            for (NodeDocument doc : docs) {
+                String p = Utils.getPathFromId(doc.getId());
+                c.childNames.add(PathUtils.getName(p));
+            }
+            c.isComplete = docs.size() < limit;
+            docChildrenCache.put(path, c);
+        }
+        return Iterables.transform(c.childNames, new Function<String, NodeDocument>() {
+            @Override
+            public NodeDocument apply(String name) {
+                String p = PathUtils.concat(path, name);
+                return store.find(Collection.NODES, Utils.getIdFromPath(p));
+            }
+        });
+    }
+
     @CheckForNull
     private Node readNode(String path, Revision readRevision) {
         String id = Utils.getIdFromPath(path);
@@ -1059,7 +1107,7 @@ public class MongoMK implements MicroKer
 
             // remove from the cache
             nodeCache.invalidate(path + "@" + rev);
-            
+
             if (n != null) {
                 Node.Children c = getChildren(path, rev, Integer.MAX_VALUE);
                 for (String childPath : c.children) {
@@ -1257,6 +1305,23 @@ public class MongoMK implements MicroKer
             c2.children.addAll(set);
             nodeChildrenCache.put(key, c2);
         }
+        if (!added.isEmpty()) {
+            NodeDocument.Children docChildren = docChildrenCache.getIfPresent(path);
+            if (docChildren != null) {
+                if (docChildren.isComplete) {
+                    TreeSet<String> names = new TreeSet<String>(docChildren.childNames);
+                    for (String childPath : added) {
+                        names.add(PathUtils.getName(childPath));
+                    }
+                    docChildren = new NodeDocument.Children();
+                    docChildren.isComplete = true;
+                    docChildren.childNames.addAll(names);
+                    docChildrenCache.put(path, docChildren);
+                } else {
+                    docChildrenCache.invalidate(path);
+                }
+            }
+        }
     }
 
     public CacheStats getNodeCacheStats() {
@@ -1271,6 +1336,10 @@ public class MongoMK implements MicroKer
         return diffCacheStats;
     }
     
+    public CacheStats getDocChildrenCacheStats() {
+        return docChildrenCacheStats;
+    }
+
     public ClusterNodeInfo getClusterInfo() {
         return clusterNodeInfo;
     }
@@ -1353,6 +1422,7 @@ public class MongoMK implements MicroKer
         private long childrenCacheSize;
         private long diffCacheSize;
         private long documentCacheSize;
+        private long docChildrenCacheSize;
         private boolean useSimpleRevision;
         private long splitDocumentAgeMillis = 5 * 60 * 1000;
 
@@ -1470,7 +1540,8 @@ public class MongoMK implements MicroKer
             this.nodeCacheSize = memoryCacheSize * 20 / 100;
             this.childrenCacheSize = memoryCacheSize * 10 / 100;
             this.diffCacheSize = memoryCacheSize * 2 / 100;
-            this.documentCacheSize = memoryCacheSize - nodeCacheSize - childrenCacheSize - diffCacheSize;
+            this.docChildrenCacheSize = memoryCacheSize * 3 / 100;
+            this.documentCacheSize = memoryCacheSize - nodeCacheSize - childrenCacheSize - diffCacheSize - docChildrenCacheSize;
             return this;
         }
 
@@ -1486,6 +1557,10 @@ public class MongoMK implements MicroKer
             return documentCacheSize;
         }
 
+        public long getDocChildrenCacheSize() {
+            return docChildrenCacheSize;
+        }
+
         public long getDiffCacheSize() {
             return diffCacheSize;
         }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java Tue Sep 17 14:36:17 2013
@@ -128,6 +128,13 @@ public class MongoMicroKernelService {
                     CacheStatsMBean.TYPE,
                     mk.getDiffCacheStats().getName())
         );
+        registrations.add(
+                registerMBean(wb,
+                        CacheStatsMBean.class,
+                        mk.getDocChildrenCacheStats(),
+                        CacheStatsMBean.TYPE,
+                        mk.getDocChildrenCacheStats().getName())
+        );
 
         DocumentStore ds = mk.getDocumentStore();
         if (ds instanceof MongoDocumentStore) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java Tue Sep 17 14:36:17 2013
@@ -36,6 +36,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import org.apache.jackrabbit.oak.cache.CacheValue;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
 import org.slf4j.Logger;
@@ -83,7 +84,7 @@ public class NodeDocument extends Docume
      */
     static final String MODIFIED = "_modified";
 
-    private static final SortedMap<Revision, Range> EMPTY_RANGE_MAP = 
+    private static final SortedMap<Revision, Range> EMPTY_RANGE_MAP =
             Collections.unmodifiableSortedMap(new TreeMap<Revision, Range>());
 
     /**
@@ -844,6 +845,21 @@ public class NodeDocument extends Docume
         checkNotNull(op).setMapEntry(DELETED, checkNotNull(revision).toString(), String.valueOf(deleted));
     }
 
+    static final class Children implements CacheValue {
+
+        final List<String> childNames = new ArrayList<String>();
+        boolean isComplete;
+
+        @Override
+        public int getMemory() {
+            int size = 8;
+            for (String name : childNames) {
+                size += name.length() * 2 + 8;
+            }
+            return size;
+        }
+    }
+
     //----------------------------< internal >----------------------------------
 
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java Tue Sep 17 14:36:17 2013
@@ -64,7 +64,7 @@ class UnsavedModifications {
      */
     public void applyTo(UnsavedModifications other, Revision mergeCommit) {
         for (Map.Entry<String, Revision> entry : map.entrySet()) {
-            Revision r = other.map.putIfAbsent(entry.getKey(), entry.getValue());
+            Revision r = other.map.putIfAbsent(entry.getKey(), mergeCommit);
             if (r != null) {
                 if (r.compareRevisionTime(mergeCommit) < 0) {
                     other.map.put(entry.getKey(), mergeCommit);