You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/09/17 16:36:18 UTC
svn commit: r1524070 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk:
MongoMK.java MongoMicroKernelService.java NodeDocument.java
UnsavedModifications.java
Author: mreutegg
Date: Tue Sep 17 14:36:17 2013
New Revision: 1524070
URL: http://svn.apache.org/r1524070
Log:
OAK-926: MongoMK: split documents when they are too large
- Introduce children cache on document level
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Tue Sep 17 14:36:17 2013
@@ -59,9 +59,11 @@ import org.apache.jackrabbit.oak.plugins
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.Weigher;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.mongodb.DB;
@@ -73,6 +75,8 @@ import static com.google.common.base.Pre
*/
public class MongoMK implements MicroKernel, RevisionContext {
+ private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
+
/**
* The threshold where special handling for many child node starts.
*/
@@ -85,7 +89,11 @@ public class MongoMK implements MicroKer
static final boolean LIRS_CACHE = Boolean.parseBoolean(
System.getProperty("oak.mongoMK.lirsCache", "false"));
- private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
+ /**
+ * Do not cache more than this number of children for a document.
+ */
+ private static final int NUM_CHILDREN_CACHE_LIMIT = Integer.getInteger(
+ "oak.mongoMK.childrenCacheLimit", 1000);
/**
* When trying to access revisions that are older than this many
@@ -174,6 +182,12 @@ public class MongoMK implements MicroKer
private final CacheStats diffCacheStats;
/**
+ * Child doc cache.
+ */
+ private final Cache<String, NodeDocument.Children> docChildrenCache;
+ private final CacheStats docChildrenCacheStats;
+
+ /**
* The unsaved last revisions. This contains the parents of all changed
* nodes, once those nodes are committed but the parent node itself wasn't
* committed yet. The parents are not immediately persisted as this would
@@ -269,6 +283,10 @@ public class MongoMK implements MicroKer
diffCacheStats = new CacheStats(diffCache, "MongoMk-DiffCache",
builder.getWeigher(), builder.getDiffCacheSize());
+ docChildrenCache = builder.buildCache(builder.getDocChildrenCacheSize());
+ docChildrenCacheStats = new CacheStats(docChildrenCache, "MongoMk-DocChildren",
+ builder.getWeigher(), builder.getDocChildrenCacheSize());
+
init();
// initial reading of the revisions of other cluster nodes
backgroundRead();
@@ -370,6 +388,8 @@ public class MongoMK implements MicroKer
// TODO invalidating the whole cache is not really needed,
// instead only those children that are cached could be checked
store.invalidateCache();
+ // TODO only invalidate affected items
+ docChildrenCache.invalidateAll();
// add a new revision, so that changes are visible
Revision r = Revision.newRevision(clusterId);
// the latest revisions of the current cluster node
@@ -549,18 +569,17 @@ public class MongoMK implements MicroKer
// TODO use offset, to avoid O(n^2) and running out of memory
// to do that, use the *name* of the last entry of the previous batch of children
// as the starting point
- String from = Utils.getKeyLowerLimit(path);
- String to = Utils.getKeyUpperLimit(path);
- List<NodeDocument> list;
+ Iterable<NodeDocument> docs;
Children c = new Children();
int rawLimit = limit;
do {
c.children.clear();
c.hasMore = true;
- list = store.query(Collection.NODES,
- from, to, rawLimit);
+ docs = readChildren(path, rawLimit);
Set<Revision> validRevisions = new HashSet<Revision>();
- for (NodeDocument doc : list) {
+ int numReturned = 0;
+ for (NodeDocument doc : docs) {
+ numReturned++;
// filter out deleted children
if (doc.isDeleted(this, rev, validRevisions)) {
continue;
@@ -571,7 +590,7 @@ public class MongoMK implements MicroKer
c.children.add(p);
}
}
- if (list.size() < rawLimit) {
+ if (numReturned < rawLimit) {
// fewer documents returned than requested
// -> no more documents
c.hasMore = false;
@@ -582,6 +601,35 @@ public class MongoMK implements MicroKer
return c;
}
+ @Nonnull
+ Iterable<NodeDocument> readChildren(final String path, int limit) {
+ String from = Utils.getKeyLowerLimit(path);
+ String to = Utils.getKeyUpperLimit(path);
+ if (limit > NUM_CHILDREN_CACHE_LIMIT) {
+ // do not use cache
+ return store.query(Collection.NODES, from, to, limit);
+ }
+ // check cache
+ NodeDocument.Children c = docChildrenCache.getIfPresent(path);
+ if (c == null || (c.childNames.size() < limit && !c.isComplete)) {
+ c = new NodeDocument.Children();
+ List<NodeDocument> docs = store.query(Collection.NODES, from, to, limit);
+ for (NodeDocument doc : docs) {
+ String p = Utils.getPathFromId(doc.getId());
+ c.childNames.add(PathUtils.getName(p));
+ }
+ c.isComplete = docs.size() < limit;
+ docChildrenCache.put(path, c);
+ }
+ return Iterables.transform(c.childNames, new Function<String, NodeDocument>() {
+ @Override
+ public NodeDocument apply(String name) {
+ String p = PathUtils.concat(path, name);
+ return store.find(Collection.NODES, Utils.getIdFromPath(p));
+ }
+ });
+ }
+
@CheckForNull
private Node readNode(String path, Revision readRevision) {
String id = Utils.getIdFromPath(path);
@@ -1059,7 +1107,7 @@ public class MongoMK implements MicroKer
// remove from the cache
nodeCache.invalidate(path + "@" + rev);
-
+
if (n != null) {
Node.Children c = getChildren(path, rev, Integer.MAX_VALUE);
for (String childPath : c.children) {
@@ -1257,6 +1305,23 @@ public class MongoMK implements MicroKer
c2.children.addAll(set);
nodeChildrenCache.put(key, c2);
}
+ if (!added.isEmpty()) {
+ NodeDocument.Children docChildren = docChildrenCache.getIfPresent(path);
+ if (docChildren != null) {
+ if (docChildren.isComplete) {
+ TreeSet<String> names = new TreeSet<String>(docChildren.childNames);
+ for (String childPath : added) {
+ names.add(PathUtils.getName(childPath));
+ }
+ docChildren = new NodeDocument.Children();
+ docChildren.isComplete = true;
+ docChildren.childNames.addAll(names);
+ docChildrenCache.put(path, docChildren);
+ } else {
+ docChildrenCache.invalidate(path);
+ }
+ }
+ }
}
public CacheStats getNodeCacheStats() {
@@ -1271,6 +1336,10 @@ public class MongoMK implements MicroKer
return diffCacheStats;
}
+ public CacheStats getDocChildrenCacheStats() {
+ return docChildrenCacheStats;
+ }
+
public ClusterNodeInfo getClusterInfo() {
return clusterNodeInfo;
}
@@ -1353,6 +1422,7 @@ public class MongoMK implements MicroKer
private long childrenCacheSize;
private long diffCacheSize;
private long documentCacheSize;
+ private long docChildrenCacheSize;
private boolean useSimpleRevision;
private long splitDocumentAgeMillis = 5 * 60 * 1000;
@@ -1470,7 +1540,8 @@ public class MongoMK implements MicroKer
this.nodeCacheSize = memoryCacheSize * 20 / 100;
this.childrenCacheSize = memoryCacheSize * 10 / 100;
this.diffCacheSize = memoryCacheSize * 2 / 100;
- this.documentCacheSize = memoryCacheSize - nodeCacheSize - childrenCacheSize - diffCacheSize;
+ this.docChildrenCacheSize = memoryCacheSize * 3 / 100;
+ this.documentCacheSize = memoryCacheSize - nodeCacheSize - childrenCacheSize - diffCacheSize - docChildrenCacheSize;
return this;
}
@@ -1486,6 +1557,10 @@ public class MongoMK implements MicroKer
return documentCacheSize;
}
+ public long getDocChildrenCacheSize() {
+ return docChildrenCacheSize;
+ }
+
public long getDiffCacheSize() {
return diffCacheSize;
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMicroKernelService.java Tue Sep 17 14:36:17 2013
@@ -128,6 +128,13 @@ public class MongoMicroKernelService {
CacheStatsMBean.TYPE,
mk.getDiffCacheStats().getName())
);
+ registrations.add(
+ registerMBean(wb,
+ CacheStatsMBean.class,
+ mk.getDocChildrenCacheStats(),
+ CacheStatsMBean.TYPE,
+ mk.getDocChildrenCacheStats().getName())
+ );
DocumentStore ds = mk.getDocumentStore();
if (ds instanceof MongoDocumentStore) {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java Tue Sep 17 14:36:17 2013
@@ -36,6 +36,7 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
import org.slf4j.Logger;
@@ -83,7 +84,7 @@ public class NodeDocument extends Docume
*/
static final String MODIFIED = "_modified";
- private static final SortedMap<Revision, Range> EMPTY_RANGE_MAP =
+ private static final SortedMap<Revision, Range> EMPTY_RANGE_MAP =
Collections.unmodifiableSortedMap(new TreeMap<Revision, Range>());
/**
@@ -844,6 +845,21 @@ public class NodeDocument extends Docume
checkNotNull(op).setMapEntry(DELETED, checkNotNull(revision).toString(), String.valueOf(deleted));
}
+ static final class Children implements CacheValue {
+
+ final List<String> childNames = new ArrayList<String>();
+ boolean isComplete;
+
+ @Override
+ public int getMemory() {
+ int size = 8;
+ for (String name : childNames) {
+ size += name.length() * 2 + 8;
+ }
+ return size;
+ }
+ }
+
//----------------------------< internal >----------------------------------
/**
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java?rev=1524070&r1=1524069&r2=1524070&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java Tue Sep 17 14:36:17 2013
@@ -64,7 +64,7 @@ class UnsavedModifications {
*/
public void applyTo(UnsavedModifications other, Revision mergeCommit) {
for (Map.Entry<String, Revision> entry : map.entrySet()) {
- Revision r = other.map.putIfAbsent(entry.getKey(), entry.getValue());
+ Revision r = other.map.putIfAbsent(entry.getKey(), mergeCommit);
if (r != null) {
if (r.compareRevisionTime(mergeCommit) < 0) {
other.map.put(entry.getKey(), mergeCommit);