You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2013/03/13 17:18:30 UTC
svn commit: r1456020 -
/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
Author: chetanm
Date: Wed Mar 13 16:18:29 2013
New Revision: 1456020
URL: http://svn.apache.org/r1456020
Log:
OAK-619 - Lock-free MongoMK implementation (WIP)
Using Guava cache instead of custom cache implementation
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1456020&r1=1456019&r2=1456020&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java Wed Mar 13 16:18:29 2013
@@ -22,15 +22,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.jackrabbit.mk.api.MicroKernel;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.mk.blobs.BlobStore;
@@ -110,14 +113,12 @@ public class MongoMK implements MicroKer
* Key: path@rev
* Value: node
*/
- private final Map<String, Node> nodeCache =
- new Cache<String, Node>(CACHE_NODES);
-
+ private final Cache<String, Node> nodeCache;
+
/**
* Child node cache.
*/
- private Cache<String, Node.Children> nodeChildrenCache =
- new Cache<String, Node.Children>(CACHE_CHILDREN);
+ private final Cache<String, Node.Children> nodeChildrenCache;
/**
* The unsaved write count increments.
@@ -125,12 +126,6 @@ public class MongoMK implements MicroKer
private final Map<String, Long> writeCountIncrements = new HashMap<String, Long>();
/**
- * The set of known valid revision.
- * The key is the revision id, the value is 1 (because a cache can't be a set).
- */
- private final Map<String, Long> revCache = new Cache<String, Long>(1024);
-
- /**
* The last known head revision. This is the last-known revision.
*/
private Revision headRevision;
@@ -173,6 +168,16 @@ public class MongoMK implements MicroKer
this.store = store;
this.blobStore = blobStore;
this.clusterId = clusterId;
+
+ //TODO Use size based weigher
+ nodeCache = CacheBuilder.newBuilder()
+ .maximumSize(CACHE_NODES)
+ .build();
+
+ nodeChildrenCache = CacheBuilder.newBuilder()
+ .maximumSize(CACHE_CHILDREN)
+ .build();
+
backgroundThread = new Thread(
new BackgroundOperation(this, isDisposed),
"MongoMK background thread");
@@ -222,7 +227,7 @@ public class MongoMK implements MicroKer
Node getNode(String path, Revision rev) {
checkRevisionAge(rev, path);
String key = path + "@" + rev;
- Node node = nodeCache.get(key);
+ Node node = nodeCache.getIfPresent(key);
if (node == null) {
node = readNode(path, rev);
if (node != null) {
@@ -258,32 +263,37 @@ public class MongoMK implements MicroKer
return x.compareRevisionTime(previous) > 0;
}
- public Node.Children readChildren(String path, String nodeId, Revision rev, int limit) {
- Node.Children c;
- c = nodeChildrenCache.get(nodeId);
- if (c != null) {
- return c;
- }
- String from = PathUtils.concat(path, "a");
- from = Utils.getIdFromPath(from);
- from = from.substring(0, from.length() - 1);
- String to = PathUtils.concat(path, "z");
- to = Utils.getIdFromPath(to);
- to = to.substring(0, to.length() - 2) + "0";
- List<Map<String, Object>> list = store.query(DocumentStore.Collection.NODES, from, to, limit);
- c = new Node.Children(path, nodeId, rev);
- for (Map<String, Object> e : list) {
- // filter out deleted children
- if (getLiveRevision(e, rev) == null) {
- continue;
- }
- // TODO put the whole node in the cache
- String id = e.get(UpdateOp.ID).toString();
- String p = Utils.getPathFromId(id);
- c.children.add(p);
+ public Node.Children readChildren(final String path, final String nodeId,
+ final Revision rev, final int limit) {
+ try {
+ return nodeChildrenCache.get(nodeId, new Callable<Children>() {
+ @Override
+ public Children call() throws Exception {
+ String from = PathUtils.concat(path, "a");
+ from = Utils.getIdFromPath(from);
+ from = from.substring(0, from.length() - 1);
+ String to = PathUtils.concat(path, "z");
+ to = Utils.getIdFromPath(to);
+ to = to.substring(0, to.length() - 2) + "0";
+ List<Map<String, Object>> list = store.query(DocumentStore.Collection.NODES, from, to, limit);
+ Children c = new Children(path, nodeId, rev);
+ for (Map<String, Object> e : list) {
+ // filter out deleted children
+ if (getLiveRevision(e, rev) == null) {
+ continue;
+ }
+ // TODO put the whole node in the cache
+ String id = e.get(UpdateOp.ID).toString();
+ String p = Utils.getPathFromId(id);
+ c.children.add(p);
+ }
+ return c;
+ }
+ });
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Error occurred while fetching node children for node "+nodeId,e);
}
- nodeChildrenCache.put(nodeId, c);
- return c;
+
}
private Node readNode(String path, Revision rev) {
@@ -638,7 +648,7 @@ public class MongoMK implements MicroKer
Node n = getNode(path, rev);
// remove from the cache
- nodeCache.remove(path + "@" + rev);
+ nodeCache.invalidate(path + "@" + rev);
if (n != null) {
Node.Children c = readChildren(path, n.getId(), rev,
@@ -646,12 +656,12 @@ public class MongoMK implements MicroKer
for (String childPath : c.children) {
markAsDeleted(childPath, commit, true);
}
- nodeChildrenCache.remove(n.getId());
+ nodeChildrenCache.invalidate(n.getId());
}
}
// Remove the node from the cache
- nodeCache.remove(path + "@" + rev);
+ nodeCache.invalidate(path + "@" + rev);
}
/**
@@ -797,54 +807,6 @@ public class MongoMK implements MicroKer
}
/**
- * A simple cache.
- *
- * @param <K> the key type
- * @param <V> the value type
- */
- static class Cache<K, V> extends LinkedHashMap<K, V> {
-
- private static final long serialVersionUID = 1L;
- private int size;
- private TreeSet<K> keySet = new TreeSet<K>();
-
- Cache(int size) {
- super(size, (float) 0.75, true);
- this.size = size;
- }
-
- public synchronized V put(K key, V value) {
- keySet.add(key);
- return super.put(key, value);
- }
-
- public synchronized V remove(Object key) {
- keySet.remove(key);
- return super.remove(key);
- }
-
- protected synchronized boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- boolean remove = size() > size;
- if (remove) {
- Object k = eldest.getKey();
- if (k != null) {
- keySet.remove(k);
- }
- }
- return remove;
- }
-
- public String toString() {
- return super.toString().replace(',', '\n');
- }
-
- // public synchronized SortedSet<K> subSet(K fromElement, K toElement) {
- // return keySet.subSet(fromElement, toElement);
- // }
-
- }
-
- /**
* A background thread.
*/
static class BackgroundOperation implements Runnable {
@@ -885,7 +847,7 @@ public class MongoMK implements MicroKer
writeCountIncrements.remove(path);
}
long newWriteCount = oldWriteCount + writeCountInc;
- Children c = nodeChildrenCache.get(path + "@" + (newWriteCount - 1));
+ Children c = nodeChildrenCache.getIfPresent(path + "@" + (newWriteCount - 1));
if (isNew || (!isDelete && c != null)) {
String id = path + "@" + newWriteCount;
Children c2 = new Children(path, id, rev);
@@ -896,7 +858,7 @@ public class MongoMK implements MicroKer
set.removeAll(removed);
set.addAll(added);
c2.children.addAll(set);
- if (nodeChildrenCache.get(id) != null) {
+ if (nodeChildrenCache.getIfPresent(id) != null) {
// TODO should not happend,
// probably a problem with add/delete/add
MicroKernelException e = new MicroKernelException(