You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/10/16 15:23:58 UTC

svn commit: r1532757 [1/2] - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/mongomk/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/

Author: mreutegg
Date: Wed Oct 16 13:23:58 2013
New Revision: 1532757

URL: http://svn.apache.org/r1532757
Log:
OAK-1102: Isolate MicroKernel specific code in MongoMK

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1532757&r1=1532756&r2=1532757&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java Wed Oct 16 13:23:58 2013
@@ -43,7 +43,7 @@ public class Commit {
 
     private static final Logger LOG = LoggerFactory.getLogger(Commit.class);
 
-    private final MongoMK mk;
+    private final MongoNodeStore nodeStore;
     private final Revision baseRevision;
     private final Revision revision;
     private HashMap<String, UpdateOp> operations = new HashMap<String, UpdateOp>();
@@ -58,10 +58,10 @@ public class Commit {
     private HashSet<String> addedNodes = new HashSet<String>();
     private HashSet<String> removedNodes = new HashSet<String>();
     
-    Commit(MongoMK mk, Revision baseRevision, Revision revision) {
+    Commit(MongoNodeStore nodeStore, Revision baseRevision, Revision revision) {
         this.baseRevision = baseRevision;
         this.revision = revision;
-        this.mk = mk;
+        this.nodeStore = nodeStore;
     }
 
     UpdateOp getUpdateOperationForNode(String path) {
@@ -169,7 +169,7 @@ public class Commit {
         // other readers. branch commits use the base revision to indicate
         // the visibility of the commit
         String commitValue = baseBranchRevision != null ? baseBranchRevision.toString() : "c";
-        DocumentStore store = mk.getDocumentStore();
+        DocumentStore store = nodeStore.getDocumentStore();
         String commitRootPath = null;
         if (baseBranchRevision != null) {
             // branch commits always use root node as commit root
@@ -271,7 +271,7 @@ public class Commit {
     }
     
     private void rollback(ArrayList<UpdateOp> newDocuments, ArrayList<UpdateOp> changed) {
-        DocumentStore store = mk.getDocumentStore();
+        DocumentStore store = nodeStore.getDocumentStore();
         for (UpdateOp op : changed) {
             UpdateOp reverse = op.getReverseOperation();
             store.createOrUpdate(Collection.NODES, reverse);
@@ -294,7 +294,7 @@ public class Commit {
             final AtomicReference<List<Revision>> collisions = new AtomicReference<List<Revision>>();
             Revision newestRev = null;
             if (doc != null) {
-                newestRev = doc.getNewestRevision(mk, revision,
+                newestRev = doc.getNewestRevision(nodeStore, revision,
                         new CollisionHandler() {
                             @Override
                             void concurrentModification(Revision other) {
@@ -316,7 +316,7 @@ public class Commit {
                     conflictMessage = "The node " + 
                             op.getId() + " was already added in revision\n" +
                             newestRev;
-                } else if (mk.isRevisionNewer(newestRev, baseRevision)
+                } else if (nodeStore.isRevisionNewer(newestRev, baseRevision)
                         && (op.isDelete() || isConflicting(doc, op))) {
                     conflictMessage = "The node " + 
                             op.getId() + " was changed in revision\n" + newestRev +
@@ -327,7 +327,7 @@ public class Commit {
             if (conflictMessage != null) {
                 conflictMessage += ", before\n" + revision + 
                         "; document:\n" + (doc == null ? "" : doc.format()) +
-                        ",\nrevision order:\n" + mk.getRevisionComparator();
+                        ",\nrevision order:\n" + nodeStore.getRevisionComparator();
                 throw new MicroKernelException(conflictMessage);
             }
             // if we get here the modification was successful
@@ -336,13 +336,13 @@ public class Commit {
             if (collisions.get() != null && isConflicting(doc, op)) {
                 for (Revision r : collisions.get()) {
                     // mark collisions on commit root
-                    new Collision(doc, r, op, revision, mk).mark(store);
+                    new Collision(doc, r, op, revision, nodeStore).mark(store);
                 }
             }
         }
 
         if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
-            mk.addSplitCandidate(doc.getId());
+            nodeStore.addSplitCandidate(doc.getId());
         }
     }
 
@@ -365,7 +365,7 @@ public class Commit {
             // or document did not exist before
             return false;
         }
-        return doc.isConflicting(op, baseRevision, mk);
+        return doc.isConflicting(op, baseRevision, nodeStore);
     }
 
     /**
@@ -404,9 +404,7 @@ public class Commit {
             boolean isNew = op != null && op.isNew();
             boolean isWritten = op != null;
             boolean isDelete = op != null && op.isDelete();
-            mk.applyChanges(revision, path, 
-                    isNew, isDelete, isWritten, isBranchCommit,
-                    added, removed);
+            nodeStore.applyChanges(revision, path, isNew, isDelete, isWritten, isBranchCommit, added, removed);
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1532757&r1=1532756&r2=1532757&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Wed Oct 16 13:23:58 2013
@@ -17,24 +17,12 @@
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
 import java.io.InputStream;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -52,29 +40,20 @@ import org.apache.jackrabbit.oak.cache.C
 import org.apache.jackrabbit.oak.cache.EmpiricalWeigher;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.mongomk.Node.Children;
-import org.apache.jackrabbit.oak.plugins.mongomk.Revision.RevisionComparator;
 import org.apache.jackrabbit.oak.plugins.mongomk.blob.MongoBlobStore;
-import org.apache.jackrabbit.oak.plugins.mongomk.util.LoggingDocumentStoreWrapper;
-import org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.Weigher;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import com.mongodb.DB;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * A MicroKernel implementation that stores the data in a MongoDB.
  */
-public class MongoMK implements MicroKernel, RevisionContext {
+public class MongoMK implements MicroKernel {
 
     /**
      * The threshold where special handling for many child node starts.
@@ -85,41 +64,20 @@ public class MongoMK implements MicroKer
     /**
      * Enable the LIRS cache.
      */
-    static final boolean LIRS_CACHE = Boolean.parseBoolean(
-            System.getProperty("oak.mongoMK.lirsCache", "false"));
+    static final boolean LIRS_CACHE = Boolean.parseBoolean(System.getProperty("oak.mongoMK.lirsCache", "false"));
 
     private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
 
     /**
-     * Do not cache more than this number of children for a document.
-     */
-    private static final int NUM_CHILDREN_CACHE_LIMIT = Integer.getInteger(
-            "oak.mongoMK.childrenCacheLimit", 16 * 1024);
-
-    /**
-     * When trying to access revisions that are older than this many
-     * milliseconds, a warning is logged. The default is one minute.
-     */
-    private static final int WARN_REVISION_AGE = 
-            Integer.getInteger("oak.mongoMK.revisionAge", 60 * 1000);
-
-    /**
-     * Enable background operations
-     */
-    private static final boolean ENABLE_BACKGROUND_OPS = Boolean.parseBoolean(
-            System.getProperty("oak.mongoMK.backgroundOps", "true"));
-
-    /**
      * Enable fast diff operations.
      */
     private static final boolean FAST_DIFF = Boolean.parseBoolean(
             System.getProperty("oak.mongoMK.fastDiff", "true"));
         
     /**
-     * How long to remember the relative order of old revision of all cluster
-     * nodes, in milliseconds. The default is one hour.
+     * The MongoDB store.
      */
-    private static final int REMEMBER_REVISION_ORDER_MILLIS = 60 * 60 * 1000;
+    protected final MongoNodeStore nodeStore;
 
     /**
      * The MongoDB store (might be used by multiple MongoMKs).
@@ -127,581 +85,57 @@ public class MongoMK implements MicroKer
     protected final DocumentStore store;
 
     /**
-     * The delay for asynchronous operations (delayed commit propagation and
-     * cache update).
-     */
-    protected int asyncDelay = 1000;
-
-    /**
-     * Whether this instance is disposed.
-     */
-    private final AtomicBoolean isDisposed = new AtomicBoolean();
-
-    /**
      * The MongoDB blob store.
      */
     private final BlobStore blobStore;
-    
-    /**
-     * The cluster instance info.
-     */
-    private final ClusterNodeInfo clusterNodeInfo;
-
-    /**
-     * The unique cluster id, similar to the unique machine id in MongoDB.
-     */
-    private final int clusterId;
-    
-    /**
-     * The splitting point in milliseconds. If a document is split, revisions
-     * older than this number of milliseconds are moved to a different document.
-     * The default is 0, meaning documents are never split. Revisions that are
-     * newer than this are kept in the newest document.
-     */
-    private final long splitDocumentAgeMillis;
-
-    /**
-     * The node cache.
-     *
-     * Key: path@rev, value: node
-     */
-    private final Cache<String, Node> nodeCache;
-    private final CacheStats nodeCacheStats;
 
     /**
-     * Child node cache.
-     * 
-     * Key: path@rev, value: children
-     */
-    private final Cache<String, Node.Children> nodeChildrenCache;
-    private final CacheStats nodeChildrenCacheStats;
-    
-    /**
      * Diff cache.
      */
     private final Cache<String, Diff> diffCache;
     private final CacheStats diffCacheStats;
 
-    /**
-     * Child doc cache.
-     */
-    private final Cache<String, NodeDocument.Children> docChildrenCache;
-    private final CacheStats docChildrenCacheStats;
-
-    /**
-     * The unsaved last revisions. This contains the parents of all changed
-     * nodes, once those nodes are committed but the parent node itself wasn't
-     * committed yet. The parents are not immediately persisted as this would
-     * cause each commit to change all parents (including the root node), which
-     * would limit write scalability.
-     * 
-     * Key: path, value: revision.
-     */
-    private final UnsavedModifications unsavedLastRevisions = new UnsavedModifications();
-
-    /**
-     * Set of IDs for documents that may need to be split.
-     */
-    private final Map<String, String> splitCandidates = Maps.newConcurrentMap();
-    
-    /**
-     * The last known revision for each cluster instance.
-     * 
-     * Key: the machine id, value: revision.
-     */
-    private final Map<Integer, Revision> lastKnownRevision =
-            new ConcurrentHashMap<Integer, Revision>();
-    
-    /**
-     * The last known head revision. This is the last-known revision.
-     */
-    private volatile Revision headRevision;
-    
-    private Thread backgroundThread;
-
-    /**
-     * Enable using simple revisions (just a counter). This feature is useful
-     * for testing.
-     */
-    private AtomicInteger simpleRevisionCounter;
-    
-    /**
-     * The comparator for revisions.
-     */
-    private final RevisionComparator revisionComparator;
-
-    /**
-     * Unmerged branches of this MongoMK instance.
-     */
-    // TODO at some point, open (unmerged) branches
-    // need to be garbage collected (in-memory and on disk)
-    private final UnmergedBranches branches;
-
-    private boolean stopBackground;
-
     MongoMK(Builder builder) {
 
-        if (builder.isUseSimpleRevision()) {
-            this.simpleRevisionCounter = new AtomicInteger(0);
-        }
-
-        DocumentStore s = builder.getDocumentStore();
-        if (builder.getTiming()) {
-            s = new TimingDocumentStoreWrapper(s);
-        }
-        if (builder.getLogging()) {
-            s = new LoggingDocumentStoreWrapper(s);
-        }
-        this.store = s;
+        this.nodeStore = builder.getNodeStore();
+        this.store = nodeStore.getDocumentStore();
         this.blobStore = builder.getBlobStore();
-        int cid = builder.getClusterId();
-        splitDocumentAgeMillis = builder.getSplitDocumentAgeMillis();
-        cid = Integer.getInteger("oak.mongoMK.clusterId", cid);
-        if (cid == 0) {
-            clusterNodeInfo = ClusterNodeInfo.getInstance(store);
-            // TODO we should ensure revisions generated from now on
-            // are never "older" than revisions already in the repository for
-            // this cluster id
-            cid = clusterNodeInfo.getId();
-        } else {
-            clusterNodeInfo = null;
-        }
-        this.clusterId = cid;
-
-        this.revisionComparator = new RevisionComparator(clusterId);
-        this.asyncDelay = builder.getAsyncDelay();
-        this.branches = new UnmergedBranches(getRevisionComparator());
-
-        //TODO Make stats collection configurable as it add slight overhead
-        //TODO Expose the stats as JMX beans
-
-        nodeCache = builder.buildCache(builder.getNodeCacheSize());
-        nodeCacheStats = new CacheStats(nodeCache, "MongoMk-Node",
-                builder.getWeigher(), builder.getNodeCacheSize());
-
-        nodeChildrenCache = builder.buildCache(builder.getChildrenCacheSize());
-        nodeChildrenCacheStats = new CacheStats(nodeChildrenCache, "MongoMk-NodeChildren",
-                builder.getWeigher(), builder.getChildrenCacheSize());
 
         diffCache = builder.buildCache(builder.getDiffCacheSize());
         diffCacheStats = new CacheStats(diffCache, "MongoMk-DiffCache",
                 builder.getWeigher(), builder.getDiffCacheSize());
 
-        docChildrenCache = builder.buildCache(builder.getDocChildrenCacheSize());
-        docChildrenCacheStats = new CacheStats(docChildrenCache, "MongoMk-DocChildren",
-                builder.getWeigher(), builder.getDocChildrenCacheSize());
-
-        init();
-        // initial reading of the revisions of other cluster nodes
-        backgroundRead();
-        getRevisionComparator().add(headRevision, Revision.newRevision(0));
-        headRevision = newRevision();
-        LOG.info("Initialized MongoMK with clusterNodeId: {}", clusterId);
+        LOG.info("Initialized MongoMK with clusterNodeId: {}", nodeStore.getClusterId());
     }
 
-    void init() {
-        headRevision = newRevision();
-        Node n = readNode("/", headRevision);
-        if (n == null) {
-            // root node is missing: repository is not initialized
-            Commit commit = new Commit(this, null, headRevision);
-            n = new Node("/", headRevision);
-            commit.addNode(n);
-            commit.applyToDocumentStore();
-        } else {
-            // initialize branchCommits
-            branches.init(store, this);
-        }
-        backgroundThread = new Thread(
-                new BackgroundOperation(this, isDisposed),
-                "MongoMK background thread");
-        backgroundThread.setDaemon(true);
-        backgroundThread.start();
-    }
-    
 
-    /**
-     * Create a new revision.
-     * 
-     * @return the revision
-     */
-    Revision newRevision() {
-        if (simpleRevisionCounter != null) {
-            return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
-        }
-        return Revision.newRevision(clusterId);
-    }
-    
-    void runBackgroundOperations() {
-        if (isDisposed.get()) {
-            return;
-        }
-        backgroundRenewClusterIdLease();
-        if (simpleRevisionCounter != null) {
-            // only when using timestamp
-            return;
-        }
-        if (!ENABLE_BACKGROUND_OPS || stopBackground) {
-            return;
-        }
-        synchronized (this) {
-            try {
-                backgroundSplit();
-                backgroundWrite();
-                backgroundRead();
-            } catch (RuntimeException e) {
-                if (isDisposed.get()) {
-                    return;
-                }
-                LOG.warn("Background operation failed: " + e.toString(), e);
-            }
-        }
+    public void dispose() {
+        nodeStore.dispose();
     }
 
-    private void backgroundRenewClusterIdLease() {
-        if (clusterNodeInfo == null) {
-            return;
-        }
-        clusterNodeInfo.renewLease(asyncDelay);
-    }
-    
     void backgroundRead() {
-        String id = Utils.getIdFromPath("/");
-        NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
-        if (doc == null) {
-            return;
-        }
-        Map<Integer, Revision> lastRevMap = doc.getLastRev();
-        
-        RevisionComparator revisionComparator = getRevisionComparator();
-        boolean hasNewRevisions = false;
-        // the (old) head occurred first
-        Revision headSeen = Revision.newRevision(0);
-        // then we saw this new revision (from another cluster node) 
-        Revision otherSeen = Revision.newRevision(0);
-        for (Entry<Integer, Revision> e : lastRevMap.entrySet()) {
-            int machineId = e.getKey();
-            if (machineId == clusterId) {
-                continue;
-            }
-            Revision r = e.getValue();
-            Revision last = lastKnownRevision.get(machineId);
-            if (last == null || r.compareRevisionTime(last) > 0) {
-                lastKnownRevision.put(machineId, r);
-                hasNewRevisions = true;
-                revisionComparator.add(r, otherSeen);
-            }
-        }
-        if (hasNewRevisions) {
-            // TODO invalidating the whole cache is not really needed,
-            // instead only those children that are cached could be checked
-            store.invalidateCache();
-            // TODO only invalidate affected items
-            docChildrenCache.invalidateAll();
-            // add a new revision, so that changes are visible
-            Revision r = Revision.newRevision(clusterId);
-            // the latest revisions of the current cluster node
-            // happened before the latest revisions of other cluster nodes
-            revisionComparator.add(r, headSeen);
-            // the head revision is after other revisions
-            headRevision = Revision.newRevision(clusterId);
-        }
-        revisionComparator.purge(Revision.getCurrentTimestamp() - REMEMBER_REVISION_ORDER_MILLIS);
-    }
-
-    private void backgroundSplit() {
-        for (Iterator<String> it = splitCandidates.keySet().iterator(); it.hasNext();) {
-            String id = it.next();
-            NodeDocument doc = store.find(Collection.NODES, id);
-            if (doc == null) {
-                continue;
-            }
-            for (UpdateOp op : doc.split(this)) {
-                NodeDocument before = store.createOrUpdate(Collection.NODES, op);
-                if (before != null) {
-                    NodeDocument after = store.find(Collection.NODES, op.getId());
-                    if (after != null) {
-                        LOG.info("Split operation on {}. Size before: {}, after: {}",
-                                new Object[]{id, before.getMemory(), after.getMemory()});
-                    }
-                }
-            }
-            it.remove();
-        }
+        nodeStore.backgroundRead();
     }
 
     void backgroundWrite() {
-        if (unsavedLastRevisions.getPaths().size() == 0) {
-            return;
-        }
-        ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.getPaths());
-        // sort by depth (high depth first), then path
-        Collections.sort(paths, new Comparator<String>() {
-
-            @Override
-            public int compare(String o1, String o2) {
-                int d1 = Utils.pathDepth(o1);
-                int d2 = Utils.pathDepth(o1);
-                if (d1 != d2) {
-                    return Integer.signum(d1 - d2);
-                }
-                return o1.compareTo(o2);
-            }
-
-        });
-        
-        long now = Revision.getCurrentTimestamp();
-        UpdateOp updateOp = null;
-        Revision lastRev = null;
-        List<String> ids = new ArrayList<String>();
-        for (int i = 0; i < paths.size(); i++) {
-            String p = paths.get(i);
-            Revision r = unsavedLastRevisions.get(p);
-            if (r == null) {
-                continue;
-            }
-            // FIXME: with below code fragment the root (and other nodes
-            // 'close' to the root) will not be updated in MongoDB when there
-            // are frequent changes.
-            if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
-                continue;
-            }
-            int size = ids.size();
-            if (updateOp == null) {
-                // create UpdateOp
-                Commit commit = new Commit(this, null, r);
-                commit.touchNode(p);
-                updateOp = commit.getUpdateOperationForNode(p);
-                lastRev = r;
-                ids.add(Utils.getIdFromPath(p));
-            } else if (r.equals(lastRev)) {
-                // use multi update when possible
-                ids.add(Utils.getIdFromPath(p));
-            }
-            // update if this is the last path or
-            // revision is not equal to last revision
-            if (i + 1 >= paths.size() || size == ids.size()) {
-                store.update(Collection.NODES, ids, updateOp);
-                for (String id : ids) {
-                    unsavedLastRevisions.remove(Utils.getPathFromId(id));
-                }
-                ids.clear();
-                updateOp = null;
-                lastRev = null;
-            }
-        }
-    }
-
-    public void dispose() {
-        // force background write (with asyncDelay > 0, the root wouldn't be written)
-        // TODO make this more obvious / explicit
-        // TODO tests should also work if this is not done
-        asyncDelay = 0;
-        runBackgroundOperations();
-        if (!isDisposed.getAndSet(true)) {
-            synchronized (isDisposed) {
-                isDisposed.notifyAll();
-            }
-            try {
-                backgroundThread.join();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-            if (clusterNodeInfo != null) {
-                clusterNodeInfo.dispose();
-            }
-            store.dispose();
-            LOG.info("Disposed MongoMK with clusterNodeId: {}", clusterId);
-        }
+        nodeStore.backgroundWrite();
     }
 
-    /**
-     * Get the node for the given path and revision. The returned object might
-     * not be modified directly.
-     *
-     * @param path the path of the node.
-     * @param rev the read revision.
-     * @return the node or <code>null</code> if the node does not exist at the
-     *          given revision.
-     */
-    @CheckForNull
-    Node getNode(final @Nonnull String path, final @Nonnull Revision rev) {
-        checkRevisionAge(checkNotNull(rev), checkNotNull(path));
-        try {
-            String key = path + "@" + rev;
-            Node node = nodeCache.get(key, new Callable<Node>() {
-                @Override
-                public Node call() throws Exception {
-                    Node n = readNode(path, rev);
-                    if (n == null) {
-                        n = Node.MISSING;
-                    }
-                    return n;
-                }
-            });
-            return node == Node.MISSING ? null : node;
-        } catch (ExecutionException e) {
-            throw new MicroKernelException(e);
-        }
+    MongoNodeStore getNodeStore() {
+        return nodeStore;
     }
 
-    /**
-     * Enqueue the document with the given id as a split candidate.
-     *
-     * @param id the id of the document to check if it needs to be split.
-     */
-    void addSplitCandidate(String id) {
-        splitCandidates.put(id, id);
-    }
-    
-    private void checkRevisionAge(Revision r, String path) {
-        // TODO only log if there are new revisions available for the given node
-        if (LOG.isDebugEnabled()) {
-            if (headRevision.getTimestamp() - r.getTimestamp() > WARN_REVISION_AGE) {
-                LOG.debug("Requesting an old revision for path " + path + ", " + 
-                    ((headRevision.getTimestamp() - r.getTimestamp()) / 1000) + " seconds old");
-            }
-        }
-    }
-    
-    /**
-     * Checks that revision x is newer than another revision.
-     * 
-     * @param x the revision to check
-     * @param previous the presumed earlier revision
-     * @return true if x is newer
-     */
-    boolean isRevisionNewer(@Nonnull Revision x, @Nonnull Revision previous) {
-        return getRevisionComparator().compare(x, previous) > 0;
-    }
-
-    public long getSplitDocumentAgeMillis() {
-        return this.splitDocumentAgeMillis;
-    }
-
-    public Children getChildren(final String path, final Revision rev, final int limit)  throws MicroKernelException {
-        checkRevisionAge(rev, path);
-        String key = path + "@" + rev;
-        Children children;
-        try {
-            children = nodeChildrenCache.get(key, new Callable<Children>() {
-                @Override
-                public Children call() throws Exception {
-                    return readChildren(path, rev, limit);
-                }
-            });
-        } catch (ExecutionException e) {
-            throw new MicroKernelException("Error occurred while fetching children nodes for path "+path, e);
-        }
-
-        //In case the limit > cached children size and there are more child nodes
-        //available then refresh the cache
-        if (children.hasMore) {
-            if (limit > children.children.size()) {
-                children = readChildren(path, rev, limit);
-                if (children != null) {
-                    nodeChildrenCache.put(key, children);
-                }
-            }
-        }
-        return children;        
-    }
-    
-    Node.Children readChildren(String path, Revision rev, int limit) {
-        // TODO use offset, to avoid O(n^2) and running out of memory
-        // to do that, use the *name* of the last entry of the previous batch of children
-        // as the starting point
-        Iterable<NodeDocument> docs;
-        Children c = new Children();
-        int rawLimit = limit;
-        Set<Revision> validRevisions = new HashSet<Revision>();
-        do {
-            c.children.clear();
-            c.hasMore = true;
-            docs = readChildren(path, rawLimit);
-            int numReturned = 0;
-            for (NodeDocument doc : docs) {
-                numReturned++;
-                // filter out deleted children
-                if (doc.isDeleted(this, rev, validRevisions)) {
-                    continue;
-                }
-                String p = Utils.getPathFromId(doc.getId());
-                if (c.children.size() < limit) {
-                    // add to children until limit is reached
-                    c.children.add(p);
-                }
-            }
-            if (numReturned < rawLimit) {
-                // fewer documents returned than requested
-                // -> no more documents
-                c.hasMore = false;
-            }
-            // double rawLimit for next round
-            rawLimit = (int) Math.min(((long) rawLimit) * 2, Integer.MAX_VALUE);
-        } while (c.children.size() < limit && c.hasMore);
-        return c;
+    ClusterNodeInfo getClusterInfo() {
+        return nodeStore.getClusterInfo();
     }
 
-    @Nonnull
-    Iterable<NodeDocument> readChildren(final String path, int limit) {
-        String from = Utils.getKeyLowerLimit(path);
-        String to = Utils.getKeyUpperLimit(path);
-        if (limit > NUM_CHILDREN_CACHE_LIMIT) {
-            // do not use cache
-            return store.query(Collection.NODES, from, to, limit);
-        }
-        // check cache
-        NodeDocument.Children c = docChildrenCache.getIfPresent(path);
-        if (c == null) {
-            c = new NodeDocument.Children();
-            List<NodeDocument> docs = store.query(Collection.NODES, from, to, limit);
-            for (NodeDocument doc : docs) {
-                String p = Utils.getPathFromId(doc.getId());
-                c.childNames.add(PathUtils.getName(p));
-            }
-            c.isComplete = docs.size() < limit;
-            docChildrenCache.put(path, c);
-        } else if (c.childNames.size() < limit && !c.isComplete) {
-            // fetch more and update cache
-            String lastName = c.childNames.get(c.childNames.size() - 1);
-            String lastPath = PathUtils.concat(path, lastName);
-            from = Utils.getIdFromPath(lastPath);
-            int remainingLimit = limit - c.childNames.size();
-            List<NodeDocument> docs = store.query(Collection.NODES,
-                    from, to, remainingLimit);
-            NodeDocument.Children clone = c.clone();
-            for (NodeDocument doc : docs) {
-                String p = Utils.getPathFromId(doc.getId());
-                clone.childNames.add(PathUtils.getName(p));
-            }
-            clone.isComplete = docs.size() < remainingLimit;
-            docChildrenCache.put(path, clone);
-            c = clone;
-        }
-        return Iterables.transform(c.childNames, new Function<String, NodeDocument>() {
-            @Override
-            public NodeDocument apply(String name) {
-                String p = PathUtils.concat(path, name);
-                return store.find(Collection.NODES, Utils.getIdFromPath(p));
-            }
-        });
+    int getPendingWriteCount() {
+        return nodeStore.getPendingWriteCount();
     }
 
-    @CheckForNull
-    private Node readNode(String path, Revision readRevision) {
-        String id = Utils.getIdFromPath(path);
-        Revision lastRevision = getPendingModifications().get(path);
-        NodeDocument doc = store.find(Collection.NODES, id);
-        if (doc == null) {
-            return null;
-        }
-        return doc.getNodeAtRevision(this, readRevision, lastRevision);
-    }
-    
     @Override
     public String getHeadRevision() throws MicroKernelException {
-        return headRevision.toString();
+        return nodeStore.getHeadRevision().toString();
     }
 
     @Override @Nonnull
@@ -767,8 +201,8 @@ public class MongoMK implements MicroKer
         }
         Revision fromRev = Revision.fromString(fromRevisionId);
         Revision toRev = Revision.fromString(toRevisionId);
-        Node from = getNode(path, fromRev);
-        Node to = getNode(path, toRev);
+        Node from = nodeStore.getNode(path, fromRev);
+        Node to = nodeStore.getNode(path, toRev);
 
         if (from == null || to == null) {
             // TODO implement correct behavior if the node does't/didn't exist
@@ -798,8 +232,8 @@ public class MongoMK implements MicroKer
         // use a MongoDB index instead
         int max = MANY_CHILDREN_THRESHOLD;
         Children fromChildren, toChildren;
-        fromChildren = getChildren(path, fromRev, max);
-        toChildren = getChildren(path, toRev, max);
+        fromChildren = nodeStore.getChildren(path, fromRev, max);
+        toChildren = nodeStore.getChildren(path, toRev, max);
         if (!fromChildren.hasMore && !toChildren.hasMore) {
             diffFewChildren(w, fromChildren, fromRev, toChildren, toRev);
         } else {
@@ -807,8 +241,8 @@ public class MongoMK implements MicroKer
                 diffManyChildren(w, path, fromRev, toRev);
             } else {
                 max = Integer.MAX_VALUE;
-                fromChildren = getChildren(path, fromRev, max);
-                toChildren = getChildren(path, toRev, max);
+                fromChildren = nodeStore.getChildren(path, fromRev, max);
+                toChildren = nodeStore.getChildren(path, toRev, max);
                 diffFewChildren(w, fromChildren, fromRev, toChildren, toRev);
             }
         }
@@ -825,8 +259,8 @@ public class MongoMK implements MicroKer
         for (NodeDocument doc : list) {
             String id = doc.getId();
             String p = Utils.getPathFromId(id);
-            Node fromNode = getNode(p, fromRev);
-            Node toNode = getNode(p, toRev);
+            Node fromNode = nodeStore.getNode(p, fromRev);
+            Node toNode = nodeStore.getNode(p, toRev);
             if (fromNode != null) {
                 // exists in fromRev
                 if (toNode != null) {
@@ -858,8 +292,8 @@ public class MongoMK implements MicroKer
             if (!childrenSet.contains(n)) {
                 w.tag('-').value(n).newline();
             } else {
-                Node n1 = getNode(n, fromRev);
-                Node n2 = getNode(n, toRev);
+                Node n1 = nodeStore.getNode(n, fromRev);
+                Node n2 = nodeStore.getNode(n, toRev);
                 // this is not fully correct:
                 // a change is detected if the node changed recently,
                 // even if the revisions are well in the past
@@ -883,9 +317,9 @@ public class MongoMK implements MicroKer
         if (!PathUtils.isAbsolute(path)) {
             throw new MicroKernelException("Path is not absolute: " + path);
         }
-        revisionId = revisionId != null ? revisionId : headRevision.toString();
+        revisionId = revisionId != null ? revisionId : nodeStore.getHeadRevision().toString();
         Revision rev = Revision.fromString(revisionId);
-        Node n = getNode(path, rev);
+        Node n = nodeStore.getNode(path, rev);
         return n != null;
     }
 
@@ -903,9 +337,9 @@ public class MongoMK implements MicroKer
         if (depth != 0) {
             throw new MicroKernelException("Only depth 0 is supported, depth is " + depth);
         }
-        revisionId = revisionId != null ? revisionId : headRevision.toString();
+        revisionId = revisionId != null ? revisionId : nodeStore.getHeadRevision().toString();
         Revision rev = Revision.fromString(revisionId);
-        Node n = getNode(path, rev);
+        Node n = nodeStore.getNode(path, rev);
         if (n == null) {
             return null;
         }
@@ -923,7 +357,7 @@ public class MongoMK implements MicroKer
             long m = ((long) maxChildNodes) + offset;
             max = (int) Math.min(m, Integer.MAX_VALUE);
         }
-        Children c = getChildren(path, rev, max);
+        Children c = nodeStore.getChildren(path, rev, max);
         for (long i = offset; i < c.children.size(); i++) {
             if (maxChildNodes-- <= 0) {
                 break;
@@ -942,228 +376,116 @@ public class MongoMK implements MicroKer
     }
 
     @Override
-    public synchronized String commit(String rootPath, String json, String baseRevId,
+    public String commit(String rootPath, String json, String baseRevId,
             String message) throws MicroKernelException {
-        Revision baseRev;
-        if (baseRevId == null) {
-            baseRev = headRevision;
-            baseRevId = baseRev.toString();
-        } else {
-            baseRev = Revision.fromString(baseRevId);
-        }
-        JsopReader t = new JsopTokenizer(json);
-        Revision rev = newRevision();
-        Commit commit = new Commit(this, baseRev, rev);
-        while (true) {
-            int r = t.read();
-            if (r == JsopReader.END) {
-                break;
+        synchronized (nodeStore) {
+            Revision baseRev;
+            if (baseRevId == null) {
+                baseRev = nodeStore.getHeadRevision();
+                baseRevId = baseRev.toString();
+            } else {
+                baseRev = Revision.fromString(baseRevId);
             }
-            String path = PathUtils.concat(rootPath, t.readString());
-            switch (r) {
-            case '+':
-                t.read(':');
-                t.read('{');
-                parseAddNode(commit, t, path);
-                break;
-            case '-':
-                commit.removeNode(path);
-                markAsDeleted(path, commit, true);
-                commit.removeNodeDiff(path);
-                break;
-            case '^':
-                t.read(':');
-                String value;
-                if (t.matches(JsopReader.NULL)) {
-                    value = null;
-                } else {
-                    value = t.readRawValue().trim();
-                }
-                String p = PathUtils.getParentPath(path);
-                String propertyName = PathUtils.getName(path);
-                commit.updateProperty(p, propertyName, value);
-                commit.updatePropertyDiff(p, propertyName, value);
-                break;
-            case '>': {
-                // TODO support moving nodes that were modified within this commit
-                t.read(':');
-                String sourcePath = path;
-                String targetPath = t.readString();
-                if (!PathUtils.isAbsolute(targetPath)) {
-                    targetPath = PathUtils.concat(rootPath, targetPath);
-                }
-                if (!nodeExists(sourcePath, baseRevId)) {
-                    throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
-                } else if (nodeExists(targetPath, baseRevId)) {
-                    throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+            JsopReader t = new JsopTokenizer(json);
+            Revision rev = nodeStore.newRevision();
+            Commit commit = new Commit(nodeStore, baseRev, rev);
+            while (true) {
+                int r = t.read();
+                if (r == JsopReader.END) {
+                    break;
+                }
+                String path = PathUtils.concat(rootPath, t.readString());
+                switch (r) {
+                    case '+':
+                        t.read(':');
+                        t.read('{');
+                        parseAddNode(commit, t, path);
+                        break;
+                    case '-':
+                        commit.removeNode(path);
+                        nodeStore.markAsDeleted(path, commit, true);
+                        commit.removeNodeDiff(path);
+                        break;
+                    case '^':
+                        t.read(':');
+                        String value;
+                        if (t.matches(JsopReader.NULL)) {
+                            value = null;
+                        } else {
+                            value = t.readRawValue().trim();
+                        }
+                        String p = PathUtils.getParentPath(path);
+                        String propertyName = PathUtils.getName(path);
+                        commit.updateProperty(p, propertyName, value);
+                        commit.updatePropertyDiff(p, propertyName, value);
+                        break;
+                    case '>': {
+                        // TODO support moving nodes that were modified within this commit
+                        t.read(':');
+                        String sourcePath = path;
+                        String targetPath = t.readString();
+                        if (!PathUtils.isAbsolute(targetPath)) {
+                            targetPath = PathUtils.concat(rootPath, targetPath);
+                        }
+                        if (!nodeExists(sourcePath, baseRevId)) {
+                            throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+                        } else if (nodeExists(targetPath, baseRevId)) {
+                            throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+                        }
+                        commit.moveNode(sourcePath, targetPath);
+                        nodeStore.moveNode(sourcePath, targetPath, commit);
+                        break;
+                    }
+                    case '*': {
+                        // TODO support copying nodes that were modified within this commit
+                        t.read(':');
+                        String sourcePath = path;
+                        String targetPath = t.readString();
+                        if (!PathUtils.isAbsolute(targetPath)) {
+                            targetPath = PathUtils.concat(rootPath, targetPath);
+                        }
+                        if (!nodeExists(sourcePath, baseRevId)) {
+                            throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+                        } else if (nodeExists(targetPath, baseRevId)) {
+                            throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+                        }
+                        commit.copyNode(sourcePath, targetPath);
+                        nodeStore.copyNode(sourcePath, targetPath, commit);
+                        break;
+                    }
+                    default:
+                        throw new MicroKernelException("token: " + (char) t.getTokenType());
                 }
-                commit.moveNode(sourcePath, targetPath);
-                moveNode(sourcePath, targetPath, commit);
-                break;
             }
-            case '*': {
-                // TODO support copying nodes that were modified within this commit
-                t.read(':');
-                String sourcePath = path;
-                String targetPath = t.readString();
-                if (!PathUtils.isAbsolute(targetPath)) {
-                    targetPath = PathUtils.concat(rootPath, targetPath);
-                }
-                if (!nodeExists(sourcePath, baseRevId)) {
-                    throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
-                } else if (nodeExists(targetPath, baseRevId)) {
-                    throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+            if (baseRev.isBranch()) {
+                rev = rev.asBranchRevision();
+                // remember branch commit
+                Branch b = nodeStore.getBranches().getBranch(baseRev);
+                if (b == null) {
+                    // baseRev is marker for new branch
+                    b = nodeStore.getBranches().create(baseRev.asTrunkRevision(), rev);
+                } else {
+                    b.addCommit(rev);
                 }
-                commit.copyNode(sourcePath, targetPath);
-                copyNode(sourcePath, targetPath, commit);
-                break;
-            }
-            default:
-                throw new MicroKernelException("token: " + (char) t.getTokenType());
-            }
-        }
-        if (baseRev.isBranch()) {
-            rev = rev.asBranchRevision();
-            // remember branch commit
-            Branch b = branches.getBranch(baseRev);
-            if (b == null) {
-                // baseRev is marker for new branch
-                b = branches.create(baseRev.asTrunkRevision(), rev);
-            } else {
-                b.addCommit(rev);
-            }
-            boolean success = false;
-            try {
-                // prepare commit
-                commit.prepare(baseRev);
-                success = true;
-            } finally {
-                if (!success) {
-                    b.removeCommit(rev);
-                    if (!b.hasCommits()) {
-                        branches.remove(b);
+                boolean success = false;
+                try {
+                    // prepare commit
+                    commit.prepare(baseRev);
+                    success = true;
+                } finally {
+                    if (!success) {
+                        b.removeCommit(rev);
+                        if (!b.hasCommits()) {
+                            nodeStore.getBranches().remove(b);
+                        }
                     }
                 }
-            }
-
-            return rev.toString();
-        } else {
-            commit.apply();
-            headRevision = commit.getRevision();
-            return rev.toString();
-        }
-    }
-
-    //------------------------< RevisionContext >-------------------------------
 
-    @Override
-    public UnmergedBranches getBranches() {
-        return branches;
-    }
-
-    @Override
-    public UnsavedModifications getPendingModifications() {
-        return unsavedLastRevisions;
-    }
-
-    @Override
-    public RevisionComparator getRevisionComparator() {
-        return revisionComparator;
-    }
-
-    @Override
-    public void publishRevision(Revision foreignRevision, Revision changeRevision) {
-        RevisionComparator revisionComparator = getRevisionComparator();
-        if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
-            // already visible
-            return;
-        }
-        int clusterNodeId = foreignRevision.getClusterId();
-        if (clusterNodeId == this.clusterId) {
-            return;
-        }
-        // the (old) head occurred first
-        Revision headSeen = Revision.newRevision(0);
-        // then we saw this new revision (from another cluster node)
-        Revision otherSeen = Revision.newRevision(0);
-        // and after that, the current change
-        Revision changeSeen = Revision.newRevision(0);
-        revisionComparator.add(foreignRevision, otherSeen);
-        // TODO invalidating the whole cache is not really needed,
-        // but how to ensure we invalidate the right part of the cache?
-        // possibly simply wait for the background thread to pick
-        // up the changes, but this depends on how often this method is called
-        store.invalidateCache();
-        // the latest revisions of the current cluster node
-        // happened before the latest revisions of other cluster nodes
-        revisionComparator.add(headRevision, headSeen);
-        revisionComparator.add(changeRevision, changeSeen);
-        // the head revision is after other revisions
-        headRevision = Revision.newRevision(clusterId);
-    }
-
-    @Override
-    public int getClusterId() {
-        return clusterId;
-    }
-
-    private void copyNode(String sourcePath, String targetPath, Commit commit) {
-        moveOrCopyNode(false, sourcePath, targetPath, commit);
-    }
-    
-    private void moveNode(String sourcePath, String targetPath, Commit commit) {
-        moveOrCopyNode(true, sourcePath, targetPath, commit);
-    }
-    
-    private void moveOrCopyNode(boolean move,
-                                String sourcePath,
-                                String targetPath,
-                                Commit commit) {
-        // TODO Optimize - Move logic would not work well with very move of very large subtrees
-        // At minimum we can optimize by traversing breadth wise and collect node id
-        // and fetch them via '$in' queries
-
-        // TODO Transient Node - Current logic does not account for operations which are part
-        // of this commit i.e. transient nodes. If its required it would need to be looked
-        // into
-
-        Node n = getNode(sourcePath, commit.getBaseRevision());
-
-        // Node might be deleted already
-        if (n == null) {
-            return;
-        }
-
-        Node newNode = new Node(targetPath, commit.getRevision());
-        n.copyTo(newNode);
-
-        commit.addNode(newNode);
-        if (move) {
-            markAsDeleted(sourcePath, commit, false);
-        }
-        Node.Children c = getChildren(sourcePath, commit.getBaseRevision(), Integer.MAX_VALUE);
-        for (String srcChildPath : c.children) {
-            String childName = PathUtils.getName(srcChildPath);
-            String destChildPath = PathUtils.concat(targetPath, childName);
-            moveOrCopyNode(move, srcChildPath, destChildPath, commit);
-        }
-    }
-
-    private void markAsDeleted(String path, Commit commit, boolean subTreeAlso) {
-        Revision rev = commit.getBaseRevision();
-        checkState(rev != null, "Base revision of commit must not be null");
-        commit.removeNode(path);
-
-        if (subTreeAlso) {
-            // recurse down the tree
-            // TODO causes issue with large number of children
-            Node n = getNode(path, rev);
-
-            if (n != null) {
-                Node.Children c = getChildren(path, rev, Integer.MAX_VALUE);
-                for (String childPath : c.children) {
-                    markAsDeleted(childPath, commit, true);
-                }
+                return rev.toString();
+            } else {
+                commit.apply();
+                nodeStore.setHeadRevision(commit.getRevision());
+                return rev.toString();
             }
         }
     }
@@ -1193,42 +515,44 @@ public class MongoMK implements MicroKer
         // nothing is written when the branch is created, the returned
         // revision simply acts as a reference to the branch base revision
         Revision revision = trunkRevisionId != null
-                ? Revision.fromString(trunkRevisionId) : headRevision;
+                ? Revision.fromString(trunkRevisionId) : nodeStore.getHeadRevision();
         return revision.asBranchRevision().toString();
     }
 
     @Override
-    public synchronized String merge(String branchRevisionId, String message)
+    public String merge(String branchRevisionId, String message)
             throws MicroKernelException {
-        // TODO improve implementation if needed
-        Revision revision = Revision.fromString(branchRevisionId);
-        if (!revision.isBranch()) {
-            throw new MicroKernelException("Not a branch: " + branchRevisionId);
-        }
-
-        // make branch commits visible
-        UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
-        Branch b = branches.getBranch(revision);
-        Revision mergeCommit = newRevision();
-        NodeDocument.setModified(op, mergeCommit);
-        if (b != null) {
-            for (Revision rev : b.getCommits()) {
-                rev = rev.asTrunkRevision();
-                NodeDocument.setRevision(op, rev, "c-" + mergeCommit.toString());
-                op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
-            }
-            if (store.findAndUpdate(Collection.NODES, op) != null) {
-                // remove from branchCommits map after successful update
-                b.applyTo(unsavedLastRevisions, mergeCommit);
-                branches.remove(b);
+        synchronized (nodeStore) {
+            // TODO improve implementation if needed
+            Revision revision = Revision.fromString(branchRevisionId);
+            if (!revision.isBranch()) {
+                throw new MicroKernelException("Not a branch: " + branchRevisionId);
+            }
+
+            // make branch commits visible
+            UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
+            Branch b = nodeStore.getBranches().getBranch(revision);
+            Revision mergeCommit = nodeStore.newRevision();
+            NodeDocument.setModified(op, mergeCommit);
+            if (b != null) {
+                for (Revision rev : b.getCommits()) {
+                    rev = rev.asTrunkRevision();
+                    NodeDocument.setRevision(op, rev, "c-" + mergeCommit.toString());
+                    op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
+                }
+                if (store.findAndUpdate(Collection.NODES, op) != null) {
+                    // remove from branchCommits map after successful update
+                    b.applyTo(nodeStore.getPendingModifications(), mergeCommit);
+                    nodeStore.getBranches().remove(b);
+                } else {
+                    throw new MicroKernelException("Conflicting concurrent change. Update operation failed: " + op);
+                }
             } else {
-                throw new MicroKernelException("Conflicting concurrent change. Update operation failed: " + op);
+                // no commits in this branch -> do nothing
             }
-        } else {
-            // no commits in this branch -> do nothing
+            nodeStore.setHeadRevision(mergeCommit);
+            return mergeCommit.toString();
         }
-        headRevision = mergeCommit;
-        return headRevision.toString();
     }
 
     @Override
@@ -1240,8 +564,8 @@ public class MongoMK implements MicroKer
         Revision r = Revision.fromString(branchRevisionId);
         Revision base = newBaseRevisionId != null ?
                 Revision.fromString(newBaseRevisionId) :
-                headRevision;
-        Branch b = branches.getBranch(r);
+                nodeStore.getHeadRevision();
+        Branch b = nodeStore.getBranches().getBranch(r);
         if (b == null) {
             // empty branch
             return base.asBranchRevision().toString();
@@ -1251,7 +575,7 @@ public class MongoMK implements MicroKer
         }
         // add a pseudo commit to make sure current head of branch
         // has a higher revision than base of branch
-        Revision head = newRevision().asBranchRevision();
+        Revision head = nodeStore.newRevision().asBranchRevision();
         b.rebase(head, base);
         return head.toString();
     }
@@ -1288,118 +612,12 @@ public class MongoMK implements MicroKer
         return store;
     }
     
-    public void setAsyncDelay(int delay) {
-        this.asyncDelay = delay;
-    }
-    
-    public int getAsyncDelay() {
-        return asyncDelay;
-    }
-
-    /**
-     * Apply the changes of a node to the cache.
-     * 
-     * @param rev the revision
-     * @param path the path
-     * @param isNew whether this is a new node
-     * @param isDelete whether the node is deleted
-     * @param isWritten whether the MongoDB documented was added / updated
-     * @param isBranchCommit whether this is from a branch commit
-     * @param added the list of added child nodes
-     * @param removed the list of removed child nodes
-     *
-     */
-    public void applyChanges(Revision rev, String path, 
-            boolean isNew, boolean isDelete, boolean isWritten,
-            boolean isBranchCommit, ArrayList<String> added,
-            ArrayList<String> removed) {
-        UnsavedModifications unsaved = unsavedLastRevisions;
-        if (isBranchCommit) {
-            Revision branchRev = rev.asBranchRevision();
-            unsaved = branches.getBranch(branchRev).getModifications(branchRev);
-        }
-        // track unsaved modifications of nodes that were not
-        // written in the commit (implicitly modified parent)
-        // or any modification if this is a branch commit
-        if (!isWritten || isBranchCommit) {
-            Revision prev = unsaved.put(path, rev);
-            if (prev != null) {
-                if (isRevisionNewer(prev, rev)) {
-                    // revert
-                    unsaved.put(path, prev);
-                    String msg = String.format("Attempt to update " +
-                            "unsavedLastRevision for %s with %s, which is " +
-                            "older than current %s.",
-                            path, rev, prev);
-                    throw new MicroKernelException(msg);
-                }
-            }
-        } else {
-            // the document was updated:
-            // we no longer need to update it in a background process
-            unsaved.remove(path);
-        }
-        String key = path + "@" + rev;
-        Children c = nodeChildrenCache.getIfPresent(key);
-        if (isNew || (!isDelete && c != null)) {
-            Children c2 = new Children();
-            TreeSet<String> set = new TreeSet<String>();
-            if (c != null) {
-                set.addAll(c.children);
-            }
-            set.removeAll(removed);
-            for (String name : added) {
-                // make sure the name string does not contain
-                // unnecessary baggage
-                set.add(new String(name));
-            }
-            c2.children.addAll(set);
-            nodeChildrenCache.put(key, c2);
-        }
-        if (!added.isEmpty()) {
-            NodeDocument.Children docChildren = docChildrenCache.getIfPresent(path);
-            if (docChildren != null) {
-                int currentSize = docChildren.childNames.size();
-                TreeSet<String> names = new TreeSet<String>(docChildren.childNames);
-                // incomplete cache entries must not be updated with
-                // names at the end of the list because there might be
-                // a next name in MongoDB smaller than the one added
-                if (!docChildren.isComplete) {
-                    for (String childPath : added) {
-                        String name = PathUtils.getName(childPath);
-                        if (names.higher(name) != null) {
-                            // make sure the name string does not contain
-                            // unnecessary baggage
-                            names.add(new String(name));
-                        }
-                    }
-                } else {
-                    // add all
-                    for (String childPath : added) {
-                        // make sure the name string does not contain
-                        // unnecessary baggage
-                        names.add(new String(PathUtils.getName(childPath)));
-                    }
-                }
-                // any changes?
-                if (names.size() != currentSize) {
-                    // create new cache entry with updated names
-                    boolean complete = docChildren.isComplete;
-                    docChildren = new NodeDocument.Children();
-                    docChildren.isComplete = complete;
-                    docChildren.childNames.addAll(names);
-                    docChildrenCache.put(path, docChildren);
-                }
-            }
-        }
-    }
-
     public CacheStats getNodeCacheStats() {
-        return nodeCacheStats;
+        return nodeStore.getNodeCacheStats();
     }
 
     public CacheStats getNodeChildrenCacheStats() {
-        return nodeChildrenCacheStats;
+        return nodeStore.getNodeChildrenCacheStats();
     }
 
     public CacheStats getDiffCacheStats() {
@@ -1407,65 +625,20 @@ public class MongoMK implements MicroKer
     }
     
     public CacheStats getDocChildrenCacheStats() {
-        return docChildrenCacheStats;
-    }
-
-    public ClusterNodeInfo getClusterInfo() {
-        return clusterNodeInfo;
-    }
-
-    public int getPendingWriteCount() {
-        return unsavedLastRevisions.getPaths().size();
+        return nodeStore.getDocChildrenCacheStats();
     }
 
     public boolean isCached(String path) {
         return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
     }
     
-    public void stopBackground() {
-        stopBackground = true;
-    }
-    
-    /**
-     * A background thread.
-     */
-    static class BackgroundOperation implements Runnable {
-        final WeakReference<MongoMK> ref;
-        private final AtomicBoolean isDisposed;
-        private int delay;
-        
-        BackgroundOperation(MongoMK mk, AtomicBoolean isDisposed) {
-            ref = new WeakReference<MongoMK>(mk);
-            delay = mk.getAsyncDelay();
-            this.isDisposed = isDisposed;
-        }
-
-        @Override
-        public void run() {
-            while (delay != 0 && !isDisposed.get()) {
-                synchronized (isDisposed) {
-                    try {
-                        isDisposed.wait(delay);
-                    } catch (InterruptedException e) {
-                        // ignore
-                    }
-                }
-                MongoMK mk = ref.get();
-                if (mk != null) {
-                    mk.runBackgroundOperations();
-                    delay = mk.getAsyncDelay();
-                }
-            }
-        }
-    }
-    
     /**
      * A (cached) result of the diff operation.
      */
     private static class Diff implements CacheValue {
-        
+
         final String diff;
-        
+
         Diff(String diff) {
             this.diff = diff;
         }
@@ -1474,7 +647,7 @@ public class MongoMK implements MicroKer
         public int getMemory() {
             return diff.length() * 2;
         }
-        
+
     }
 
     /**
@@ -1482,6 +655,7 @@ public class MongoMK implements MicroKer
      */
     public static class Builder {
         private static final long DEFAULT_MEMORY_CACHE_SIZE = 256 * 1024 * 1024;
+        private MongoNodeStore nodeStore;
         private DocumentStore documentStore;
         private BlobStore blobStore;
         private int clusterId  = Integer.getInteger("oak.mongoMK.clusterId", 0);
@@ -1557,6 +731,13 @@ public class MongoMK implements MicroKer
             return documentStore;
         }
 
+        public MongoNodeStore getNodeStore() {
+            if (nodeStore == null) {
+                nodeStore = new MongoNodeStore(this);
+            }
+            return nodeStore;
+        }
+
         /**
          * Set the blob store to use. By default an in-memory store is used.
          *