You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/06/11 09:07:27 UTC

svn commit: r1684820 [1/2] - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/memory/ main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ main/java...

Author: mreutegg
Date: Thu Jun 11 07:07:27 2015
New Revision: 1684820

URL: http://svn.apache.org/r1684820
Log:
OAK-2829: Comparing node states for external changes is too slow

Work in progress, which includes all the changes done so far by Stefan Egli and Marcel Reutegger
Individual commits are on github: https://github.com/mreutegg/jackrabbit-oak/tree/OAK-2829

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java Thu Jun 11 07:07:27 2015
@@ -70,6 +70,20 @@ public abstract class Collection<T exten
                 }
             };
 
+    /**
+     * The 'journal' collection contains documents with consolidated
+     * diffs for changes performed by a cluster node between two background
+     * updates.
+     */
+    public static final Collection<JournalEntry> JOURNAL =
+            new Collection<JournalEntry>("journal") {
+        @Nonnull
+        @Override
+        public JournalEntry newDocument(DocumentStore store) {
+            return new JournalEntry(store);
+        }
+    };
+
     private final String name;
 
     public Collection(String name) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java Thu Jun 11 07:07:27 2015
@@ -40,7 +40,9 @@ import org.slf4j.LoggerFactory;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.transform;
+import static java.util.Collections.singletonList;
 import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SPLIT_CANDIDATE_THRESHOLD;
@@ -52,7 +54,7 @@ public class Commit {
 
     private static final Logger LOG = LoggerFactory.getLogger(Commit.class);
 
-    private final DocumentNodeStore nodeStore;
+    protected final DocumentNodeStore nodeStore;
     private final DocumentNodeStoreBranch branch;
     private final Revision baseRevision;
     private final Revision revision;
@@ -128,6 +130,15 @@ public class Commit {
         return baseRevision;
     }
 
+    /**
+     * @return all modified paths, including ancestors without explicit
+     *          modifications.
+     */
+    @Nonnull
+    Iterable<String> getModifiedPaths() {
+        return modifiedNodes;
+    }
+
     void addNodeDiff(DocumentNodeState n) {
         diff.tag('+').key(n.getPath());
         diff.object();
@@ -270,7 +281,7 @@ public class Commit {
         // so that all operations can be rolled back if there is a conflict
         ArrayList<UpdateOp> opLog = new ArrayList<UpdateOp>();
 
-        //Compute the commit root
+        // Compute the commit root
         for (String p : operations.keySet()) {
             markChanged(p);
             if (commitRootPath == null) {
@@ -284,6 +295,16 @@ public class Commit {
                 }
             }
         }
+
+        // push branch changes to journal
+        if (baseBranchRevision != null) {
+            // store as external change
+            JournalEntry doc = JOURNAL.newDocument(store);
+            doc.modified(modifiedNodes);
+            Revision r = revision.asBranchRevision();
+            store.create(JOURNAL, singletonList(doc.asUpdateOp(r)));
+        }
+
         int commitRootDepth = PathUtils.getDepth(commitRootPath);
         // check if there are real changes on the commit root
         boolean commitRootHasChanges = operations.containsKey(commitRootPath);
@@ -569,7 +590,7 @@ public class Commit {
             }
             list.add(p);
         }
-        DiffCache.Entry cacheEntry = nodeStore.getDiffCache().newEntry(before, revision);
+        DiffCache.Entry cacheEntry = nodeStore.getDiffCache().newEntry(before, revision, true);
         LastRevTracker tracker = nodeStore.createTracker(revision, isBranchCommit);
         List<String> added = new ArrayList<String>();
         List<String> removed = new ArrayList<String>();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java Thu Jun 11 07:07:27 2015
@@ -56,11 +56,14 @@ public interface DiffCache {
      *
      * @param from the from revision.
      * @param to the to revision.
+     * @param local true indicates that the entry results from a local change,
+     * false if it results from an external change
      * @return the cache entry.
      */
     @Nonnull
     Entry newEntry(@Nonnull Revision from,
-                   @Nonnull Revision to);
+                   @Nonnull Revision to,
+                   boolean local);
 
     /**
      * @return the statistics for this cache.

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java Thu Jun 11 07:07:27 2015
@@ -398,7 +398,7 @@ public class DocumentNodeState extends A
         StringBuilder buff = new StringBuilder();
         buff.append("{ path: '").append(path).append("', ");
         buff.append("rev: '").append(rev).append("', ");
-        buff.append("properties: '").append(properties).append("' }");
+        buff.append("properties: '").append(properties.values()).append("' }");
         return buff.toString();
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Thu Jun 11 07:07:27 2015
@@ -21,11 +21,14 @@ import static com.google.common.base.Pre
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.toArray;
 import static com.google.common.collect.Iterables.transform;
+import static java.util.Collections.singletonList;
 import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
 import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.MANY_CHILDREN_THRESHOLD;
+import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
 import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
@@ -84,6 +87,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.commons.json.JsopStream;
 import org.apache.jackrabbit.oak.commons.json.JsopWriter;
+import org.apache.jackrabbit.oak.commons.sort.StringSort;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.cache.CacheStats;
@@ -246,6 +250,12 @@ public final class DocumentNodeStore
     private final Map<String, String> splitCandidates = Maps.newConcurrentMap();
 
     /**
+     * Summary of changes done by this cluster node to persist by the background
+     * update thread.
+     */
+    private JournalEntry changes;
+
+    /**
      * The last known revision for each cluster instance.
      *
      * Key: the machine id, value: revision.
@@ -359,6 +369,8 @@ public final class DocumentNodeStore
 
     private final VersionGarbageCollector versionGarbageCollector;
 
+    private final JournalGarbageCollector journalGarbageCollector;
+    
     private final Executor executor;
 
     private final LastRevRecoveryAgent lastRevRecoveryAgent;
@@ -382,6 +394,7 @@ public final class DocumentNodeStore
             s = new LoggingDocumentStoreWrapper(s);
         }
         this.store = s;
+        this.changes = Collection.JOURNAL.newDocument(s);
         this.executor = builder.getExecutor();
         this.clock = builder.getClock();
         int cid = builder.getClusterId();
@@ -401,6 +414,7 @@ public final class DocumentNodeStore
         this.asyncDelay = builder.getAsyncDelay();
         this.versionGarbageCollector = new VersionGarbageCollector(
                 this, builder.createVersionGCSupport());
+        this.journalGarbageCollector = new JournalGarbageCollector(this);
         this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this);
         this.disableBranches = builder.isDisableBranches();
         this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) {
@@ -428,7 +442,8 @@ public final class DocumentNodeStore
         checkpoints = new Checkpoints(this);
 
         // check if root node exists
-        if (store.find(Collection.NODES, Utils.getIdFromPath("/")) == null) {
+        NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath("/"));
+        if (rootDoc == null) {
             // root node is missing: repository is not initialized
             Revision head = newRevision();
             Commit commit = new Commit(this, head, null, null);
@@ -449,6 +464,11 @@ public final class DocumentNodeStore
                 // no revision read from other cluster nodes
                 setHeadRevision(newRevision());
             }
+            // check if _lastRev for our clusterId exists
+            if (!rootDoc.getLastRev().containsKey(clusterId)) {
+                unsavedLastRevisions.put("/", headRevision);
+                backgroundWrite();
+            }
         }
         getRevisionComparator().add(headRevision, Revision.newRevision(0));
 
@@ -642,6 +662,8 @@ public final class DocumentNodeStore
                         Revision before = getHeadRevision();
                         // apply changes to cache based on before revision
                         c.applyToCache(before, false);
+                        // track modified paths
+                        changes.modified(c.getModifiedPaths());
                         // update head revision
                         setHeadRevision(c.getRevision());
                         dispatcher.contentChanged(getRoot(), info);
@@ -1023,15 +1045,13 @@ public final class DocumentNodeStore
         }
 
         final Revision readRevision = parent.getLastRevision();
-        return transform(getChildren(parent, name, limit).children,
-                new Function<String, DocumentNodeState>() {
+        return transform(getChildren(parent, name, limit).children, new Function<String, DocumentNodeState>() {
             @Override
             public DocumentNodeState apply(String input) {
                 String p = concat(parent.getPath(), input);
                 DocumentNodeState result = getNode(p, readRevision);
                 if (result == null) {
-                    throw new DocumentStoreException("DocumentNodeState is null for revision " + readRevision + " of " + p
-                            + " (aborting getChildNodes())");
+                    throw new DocumentStoreException("DocumentNodeState is null for revision " + readRevision + " of " + p + " (aborting getChildNodes())");
                 }
                 return result;
             }
@@ -1050,10 +1070,8 @@ public final class DocumentNodeStore
                     path, readRevision);
             return null;
         }
-        final DocumentNodeState result = doc.getNodeAtRevision(this,
-                readRevision, lastRevision);
-        PERFLOG.end(start, 1, "readNode: path={}, readRevision={}", path,
-                readRevision);
+        final DocumentNodeState result = doc.getNodeAtRevision(this, readRevision, lastRevision);
+        PERFLOG.end(start, 1, "readNode: path={}, readRevision={}", path, readRevision);
         return result;
     }
 
@@ -1068,10 +1086,10 @@ public final class DocumentNodeStore
      * @param changed the list of changed child nodes.
      *
      */
-    public void applyChanges(Revision rev, String path,
-                             boolean isNew, List<String> added,
-                             List<String> removed, List<String> changed,
-                             DiffCache.Entry cacheEntry) {
+    void applyChanges(Revision rev, String path,
+                      boolean isNew, List<String> added,
+                      List<String> removed, List<String> changed,
+                      DiffCache.Entry cacheEntry) {
         if (isNew && !added.isEmpty()) {
             DocumentNodeState.Children c = new DocumentNodeState.Children();
             Set<String> set = Sets.newTreeSet();
@@ -1086,13 +1104,13 @@ public final class DocumentNodeStore
         // update diff cache
         JsopWriter w = new JsopStream();
         for (String p : added) {
-            w.tag('+').key(PathUtils.getName(p)).object().endObject().newline();
+            w.tag('+').key(PathUtils.getName(p)).object().endObject();
         }
         for (String p : removed) {
-            w.tag('-').value(PathUtils.getName(p)).newline();
+            w.tag('-').value(PathUtils.getName(p));
         }
         for (String p : changed) {
-            w.tag('^').key(PathUtils.getName(p)).object().endObject().newline();
+            w.tag('^').key(PathUtils.getName(p)).object().endObject();
         }
         cacheEntry.append(path, w.toString());
 
@@ -1133,6 +1151,15 @@ public final class DocumentNodeStore
     }
 
     /**
+     * Called when a branch is merged.
+     *
+     * @param revisions the revisions of the merged branch commits.
+     */
+    void revisionsMerged(@Nonnull Iterable<Revision> revisions) {
+        changes.branchCommit(revisions);
+    }
+
+    /**
      * Updates a commit root document.
      *
      * @param commit the updates to apply on the commit root document.
@@ -1310,6 +1337,7 @@ public final class DocumentNodeStore
             UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
             NodeDocument.setModified(op, commit.getRevision());
             if (b != null) {
+                commit.addBranchCommits(b);
                 Iterator<Revision> mergeCommits = commit.getMergeRevisions().iterator();
                 for (Revision rev : b.getCommits()) {
                     rev = rev.asTrunkRevision();
@@ -1733,6 +1761,8 @@ public final class DocumentNodeStore
         // then we saw this new revision (from another cluster node)
         Revision otherSeen = Revision.newRevision(0);
 
+        StringSort externalSort = JournalEntry.newSorter();
+        
         Map<Revision, Revision> externalChanges = Maps.newHashMap();
         for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
             int machineId = e.getKey();
@@ -1753,6 +1783,15 @@ public final class DocumentNodeStore
                         || r.getTimestamp() > revisionPurgeMillis()) {
                     externalChanges.put(r, otherSeen);
                 }
+                // collect external changes
+                if (last != null) {
+            	    // add changes for this particular clusterId to the externalSort
+            	    try {
+                        fillExternalChanges(externalSort, last, r, store);
+                    } catch (IOException e1) {
+                        LOG.error("backgroundRead: Exception while reading external changes from journal: "+e1, e1);
+                    }
+                }
             }
         }
 
@@ -1772,7 +1811,6 @@ public final class DocumentNodeStore
             backgroundOperationLock.writeLock().lock();
             try {
                 stats.lock = clock.getTime() - time;
-                time = clock.getTime();
 
                 // the latest revisions of the current cluster node
                 // happened before the latest revisions of other cluster nodes
@@ -1781,9 +1819,23 @@ public final class DocumentNodeStore
                 for (Map.Entry<Revision, Revision> e : externalChanges.entrySet()) {
                     revisionComparator.add(e.getKey(), e.getValue());
                 }
+
+                Revision oldHead = headRevision;
                 // the new head revision is after other revisions
                 setHeadRevision(newRevision());
                 if (dispatchChange) {
+                    time = clock.getTime();
+                    if (externalSort!=null) {
+                    	// then there were external changes - apply them to the diff cache
+                    	try {
+                            JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
+                        } catch (Exception e1) {
+                            LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+                        }
+                    }
+                    stats.populateDiffCache = clock.getTime() - time;
+                    time = clock.getTime();
+
                     dispatcher.contentChanged(getRoot().fromExternalChange(), null);
                 }
             } finally {
@@ -1802,6 +1854,7 @@ public final class DocumentNodeStore
         CacheInvalidationStats cacheStats;
         long readHead;
         long cacheInvalidationTime;
+        long populateDiffCache;
         long lock;
         long dispatchChanges;
         long purge;
@@ -1816,6 +1869,7 @@ public final class DocumentNodeStore
                     "cacheStats:" + cacheStatsMsg +
                     ", head:" + readHead +
                     ", cache:" + cacheInvalidationTime +
+                    ", diff: " + populateDiffCache +
                     ", lock:" + lock +
                     ", dispatch:" + dispatchChanges +
                     ", purge:" + purge +
@@ -1900,7 +1954,15 @@ public final class DocumentNodeStore
     }
 
     BackgroundWriteStats backgroundWrite() {
-        return unsavedLastRevisions.persist(this, backgroundOperationLock.writeLock());
+        return unsavedLastRevisions.persist(this, new UnsavedModifications.Snapshot() {
+            @Override
+            public void acquiring() {
+                if (store.create(JOURNAL,
+                        singletonList(changes.asUpdateOp(getHeadRevision())))) {
+                    changes = JOURNAL.newDocument(getDocumentStore());
+                }
+            }
+        }, backgroundOperationLock.writeLock());
     }
 
     //-----------------------------< internal >---------------------------------
@@ -1983,19 +2045,23 @@ public final class DocumentNodeStore
                 case '^': {
                     String name = unshareString(t.readString());
                     t.read(':');
-                    if (t.matches('{')) {
-                        t.read('}');
-                        continueComparison = diff.childNodeChanged(name,
-                                base.getChildNode(name),
-                                node.getChildNode(name));
-                    } else if (t.matches('[')) {
-                        // ignore multi valued property
-                        while (t.read() != ']') {
-                            // skip values
+                    t.read('{');
+                    t.read('}');
+                    NodeState baseChild = base.getChildNode(name);
+                    NodeState nodeChild = node.getChildNode(name);
+                    if (baseChild.exists()) {
+                        if (nodeChild.exists()) {
+                            continueComparison = diff.childNodeChanged(name,
+                                    baseChild, nodeChild);
+                        } else {
+                            continueComparison = diff.childNodeDeleted(name,
+                                    baseChild);
                         }
                     } else {
-                        // ignore single valued property
-                        t.read();
+                        if (nodeChild.exists()) {
+                            continueComparison = diff.childNodeAdded(name,
+                                    nodeChild);
+                        }
                     }
                     break;
                 }
@@ -2110,13 +2176,14 @@ public final class DocumentNodeStore
             }
         }
 
+        String diff = w.toString();
         if (debug) {
             long end = now();
-            LOG.debug("Diff performed via '{}' at [{}] between revisions [{}] => [{}] took {} ms ({} ms)",
+            LOG.debug("Diff performed via '{}' at [{}] between revisions [{}] => [{}] took {} ms ({} ms), diff '{}'",
                     diffAlgo, from.getPath(), fromRev, toRev,
-                    end - start, getChildrenDoneIn - start);
+                    end - start, getChildrenDoneIn - start, diff);
         }
-        return w.toString();
+        return diff;
     }
 
     private void diffManyChildren(JsopWriter w, String path, Revision fromRev, Revision toRev) {
@@ -2164,17 +2231,17 @@ public final class DocumentNodeStore
                     if (a == null && b == null) {
                         // ok
                     } else if (a == null || b == null || !a.equals(b)) {
-                        w.tag('^').key(name).object().endObject().newline();
+                        w.tag('^').key(name).object().endObject();
                     }
                 } else {
                     // does not exist in toRev -> was removed
-                    w.tag('-').value(name).newline();
+                    w.tag('-').value(name);
                 }
             } else {
                 // does not exist in fromRev
                 if (toNode != null) {
                     // exists in toRev
-                    w.tag('+').key(name).object().endObject().newline();
+                    w.tag('+').key(name).object().endObject();
                 } else {
                     // does not exist in either revisions
                     // -> do nothing
@@ -2201,7 +2268,7 @@ public final class DocumentNodeStore
         Set<String> childrenSet = Sets.newHashSet(toChildren.children);
         for (String n : fromChildren.children) {
             if (!childrenSet.contains(n)) {
-                w.tag('-').value(n).newline();
+                w.tag('-').value(n);
             } else {
                 String path = concat(parentPath, n);
                 DocumentNodeState n1 = getNode(path, fromRev);
@@ -2213,14 +2280,14 @@ public final class DocumentNodeStore
                 checkNotNull(n1, "Node at [%s] not found for fromRev [%s]", path, fromRev);
                 checkNotNull(n2, "Node at [%s] not found for toRev [%s]", path, toRev);
                 if (!n1.getLastRevision().equals(n2.getLastRevision())) {
-                    w.tag('^').key(n).object().endObject().newline();
+                    w.tag('^').key(n).object().endObject();
                 }
             }
         }
         childrenSet = Sets.newHashSet(fromChildren.children);
         for (String n : toChildren.children) {
             if (!childrenSet.contains(n)) {
-                w.tag('+').key(n).object().endObject().newline();
+                w.tag('+').key(n).object().endObject();
             }
         }
     }
@@ -2520,6 +2587,12 @@ public final class DocumentNodeStore
     public VersionGarbageCollector getVersionGarbageCollector() {
         return versionGarbageCollector;
     }
+
+    @Nonnull
+    public JournalGarbageCollector getJournalGarbageCollector() {
+        return journalGarbageCollector;
+    }
+    
     @Nonnull
     public LastRevRecoveryAgent getLastRevRecoveryAgent() {
         return lastRevRecoveryAgent;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Thu Jun 11 07:07:27 2015
@@ -27,6 +27,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_NODE_CACHE_PERCENTAGE;
 import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -44,6 +45,7 @@ import com.mongodb.DB;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -210,6 +212,23 @@ public class DocumentNodeStoreService {
     )
     public static final String CUSTOM_BLOB_STORE = "customBlobStore";
 
+    private static final long DEFAULT_JOURNAL_GC_INTERVAL_MILLIS = 5*60*1000; // default is 5min
+    @Property(longValue = DEFAULT_JOURNAL_GC_INTERVAL_MILLIS,
+            label = "Journal Garbage Collection Interval (millis)",
+            description = "Long value indicating interval (in milliseconds) with which the "
+                    + "journal (for external changes) is cleaned up. Default is " + DEFAULT_JOURNAL_GC_INTERVAL_MILLIS
+    )
+    private static final String PROP_JOURNAL_GC_INTERVAL_MILLIS = "journalGCInterval";
+    
+    private static final long DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS = 6*60*60*1000; // default is 6hours
+    @Property(longValue = DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS,
+            label = "Maximum Age of Journal Entries (millis)",
+            description = "Long value indicating max age (in milliseconds) that "
+                    + "journal (for external changes) entries are kept (older ones are candidates for gc). "
+                    + "Default is " + DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS
+    )
+    private static final String PROP_JOURNAL_GC_MAX_AGE_MILLIS = "journalGCMaxAge";
+    
     /**
      * Boolean value indicating a different DataSource has to be used for
      * BlobStore
@@ -430,6 +449,7 @@ public class DocumentNodeStoreService {
 
         registerJMXBeans(mk.getNodeStore());
         registerLastRevRecoveryJob(mk.getNodeStore());
+        registerJournalGC(mk.getNodeStore());
 
         NodeStore store;
         DocumentNodeStore mns = mk.getNodeStore();
@@ -610,6 +630,23 @@ public class DocumentNodeStoreService {
                 recoverJob, TimeUnit.MILLISECONDS.toSeconds(leaseTime)));
     }
 
+    private void registerJournalGC(final DocumentNodeStore nodeStore) {
+        long journalGCInterval = toLong(context.getProperties().get(PROP_JOURNAL_GC_INTERVAL_MILLIS),
+                DEFAULT_JOURNAL_GC_INTERVAL_MILLIS);
+        final long journalGCMaxAge = toLong(context.getProperties().get(PROP_JOURNAL_GC_MAX_AGE_MILLIS),
+                DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS);
+        Runnable journalGCJob = new Runnable() {
+
+            @Override
+            public void run() {
+                nodeStore.getJournalGarbageCollector().gc(journalGCMaxAge, TimeUnit.MILLISECONDS);
+            }
+
+        };
+        registrations.add(WhiteboardUtils.scheduleWithFixedDelay(whiteboard,
+                journalGCJob, TimeUnit.MILLISECONDS.toSeconds(journalGCInterval), true/*runOnSingleClusterNode*/));
+    }
+
     private Object prop(String propName) {
         return prop(propName, PREFIX + propName);
     }

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?rev=1684820&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Thu Jun 11 07:07:27 2015
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+import org.apache.jackrabbit.oak.commons.json.JsopReader;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.apache.jackrabbit.oak.commons.sort.StringSort;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
+
+/**
+ * Keeps track of changes performed between two consecutive background updates.
+ *
+ * Done:
+ *      Query external changes in chunks.
+ *      {@link #getChanges(Revision, Revision, DocumentStore)} current reads
+ *      all JournalEntry documents in one go with a limit of Integer.MAX_VALUE.
+ * Done:
+ *      Use external sort when changes are applied to diffCache. See usage of
+ *      {@link #applyTo(DiffCache, Revision, Revision)} in
+ *      {@link DocumentNodeStore#backgroundRead(boolean)}.
+ *      The utility {@link StringSort} can be used for this purpose.
+ * Done:
+ *      Push changes to {@link MemoryDiffCache} instead of {@link LocalDiffCache}.
+ *      See {@link TieredDiffCache#newEntry(Revision, Revision)}. Maybe a new
+ *      method is needed for this purpose?
+ * Done (incl junit) 
+ *      Create JournalEntry for external changes related to _lastRev recovery.
+ *      See {@link LastRevRecoveryAgent#recover(Iterator, int, boolean)}.
+ * Done (incl junit)
+ *      Cleanup old journal entries in the document store.
+ * Done:
+ *      integrate the JournalGarbageCollector similarly to the VersionGarbageCollector
+ */
+public final class JournalEntry extends Document {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JournalEntry.class);
+
+    /**
+     * The revision format for external changes:
+     * &lt;clusterId>-&lt;timestamp>-&lt;counter>. The string is prefixed with
+     * "b" if it denotes a branch revision.
+     */
+    private static final String REVISION_FORMAT = "%d-%0" +
+            Long.toHexString(Long.MAX_VALUE).length() + "x-%0" +
+            Integer.toHexString(Integer.MAX_VALUE).length() + "x";
+
+    private static final String CHANGES = "_c";
+
+    private static final String BRANCH_COMMITS = "_bc";
+
+    private static final int READ_CHUNK_SIZE = 1024;
+    
+    private static final int STRINGSORT_OVERFLOW_TO_DISK_THRESHOLD = 1024 * 1024; // switch to disk after 1MB
+    
+    private final DocumentStore store;
+
+    private volatile TreeNode changes = null;
+
+    JournalEntry(DocumentStore store) {
+        this.store = store;
+    }
+    
+    static StringSort newSorter() {
+        return new StringSort(STRINGSORT_OVERFLOW_TO_DISK_THRESHOLD, new Comparator<String>() {
+            @Override
+            public int compare(String arg0, String arg1) {
+                return arg0.compareTo(arg1);
+            }
+        });
+    }
+	
+    static void applyTo(@Nonnull StringSort externalSort,
+    				    @Nonnull DiffCache diffCache,
+    				    @Nonnull Revision from,
+    				    @Nonnull Revision to) throws IOException {
+        LOG.debug("applyTo: starting for {} to {}", from, to);
+		externalSort.sort();
+		// note that it is not deduplicated yet
+		LOG.debug("applyTo: sorting done.");
+		
+		final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false);
+
+		final Iterator<String> it = externalSort.getIds();
+		if (!it.hasNext()) {
+			// nothing at all? that's quite unusual..
+			
+			// we apply this diff as one '/' to the entry then
+		    entry.append("/", "");
+			entry.done();
+			return;
+		}
+		String previousPath = it.next();
+		TreeNode node = new TreeNode(null, "");
+		node = node.getOrCreatePath(previousPath); 
+		int totalCnt = 0;
+		int deDuplicatedCnt = 0;
+		while(it.hasNext()) {
+			totalCnt++;
+			final String currentPath = it.next();
+			if (previousPath.equals(currentPath)) {
+				// de-duplication
+				continue;
+			}
+			
+			// 'node' contains one hierarchy line, eg /a, /a/b, /a/b/c, /a/b/c/d
+			// including the children on each level.
+			// these children have not yet been appended to the diffCache entry
+			// and have to be added as soon as the 'currentPath' is not
+			// part of that hierarchy anymore and we 'move elsewhere'.
+			// eg if 'currentPath' is /a/b/e, then we must flush /a/b/c/d and /a/b/c
+			while(node!=null && !node.isParentOf(currentPath)) {
+				// add parent to the diff entry
+			    entry.append(node.getPath(), getChanges(node));
+				deDuplicatedCnt++;
+				node = node.parent;
+			}
+			
+			if (node==null) {
+			    // we should never go 'passed' the root, hence node should 
+			    // never be null - if it becomes null anyway, start with
+			    // a fresh root:
+			    node = new TreeNode(null, "");
+	            node = node.getOrCreatePath(currentPath);
+			} else {
+			    // this is the normal route: we add a direct or grand-child
+			    // node to the current node:
+			    node = node.getOrCreatePath(currentPath);
+			}
+			previousPath = currentPath;
+		}
+		
+		// once we're done we still have the last hierarchy line contained in 'node',
+		// eg /x, /x/y, /x/y/z
+		// and that one we must now append to the diffcache entry:
+		while(node!=null) {
+            entry.append(node.getPath(), getChanges(node));
+			deDuplicatedCnt++;
+			node = node.parent;
+		}
+		
+		// and finally: mark the diffcache entry as 'done':
+        entry.done();
+        LOG.debug("applyTo: done. totalCnt: {}, deDuplicatedCnt: {}", totalCnt, deDuplicatedCnt);
+    }
+    
+    /**
+     * Reads all external changes between the two given revisions (with the same clusterId)
+     * from the journal and appends the paths therein to the provided sorter.
+     *
+     * @param sorter the StringSort to which all externally changed paths between
+     * the provided revisions will be added
+     * @param from the lower bound of the revision range (exclusive).
+     * @param to the upper bound of the revision range (inclusive).
+     * @param store the document store to query.
+     * @throws IOException 
+     */
+    static void fillExternalChanges(@Nonnull StringSort sorter,
+                                    @Nonnull Revision from,
+                                    @Nonnull Revision to,
+                                    @Nonnull DocumentStore store) throws IOException {
+        checkArgument(checkNotNull(from).getClusterId() == checkNotNull(to).getClusterId());
+        
+        // to is inclusive, but DocumentStore.query() toKey is exclusive
+        final String inclusiveToId = asId(to);
+        to = new Revision(to.getTimestamp(), to.getCounter() + 1,
+                to.getClusterId(), to.isBranch());
+
+        // read in chunks to support very large sets of changes between subsequent background reads
+        // to do this, provide a (TODO eventually configurable) limit for the number of entries to be returned per query
+        // if the number of elements returned by the query is exactly the provided limit, then
+        // loop and do subsequent queries
+        final String toId = asId(to);
+        String fromId = asId(from);
+        while(true) {
+        	if (fromId.equals(inclusiveToId)) {
+        		// avoid query if from and to are off by just 1 counter (which we do due to exclusiveness of query borders)
+        		// as in this case the query will always be empty anyway - so avoid doing the query in the first place
+        		break;
+        	}
+			List<JournalEntry> partialResult = store.query(JOURNAL, fromId, toId, READ_CHUNK_SIZE);
+			if (partialResult==null) {
+				break;
+			}
+			for(JournalEntry d: partialResult) {
+				d.addTo(sorter);
+			}
+			if (partialResult.size()<READ_CHUNK_SIZE) {
+				break;
+			}
+			// otherwise set 'fromId' to the last entry just processed
+			// that works fine as the query is non-inclusive (ie does not include the from which we'd otherwise double-process)
+			fromId = partialResult.get(partialResult.size()-1).getId();
+        }
+    }
+
+    long getRevisionTimestamp() {
+        final String[] parts = getId().split("_");
+        return Long.parseLong(parts[1]);
+    }
+
+    void modified(String path) {
+        TreeNode node = getChanges();
+        for (String name : PathUtils.elements(path)) {
+            node = node.getOrCreate(name);
+        }
+    }
+
+    void modified(Iterable<String> paths) {
+        for (String p : paths) {
+            modified(p);
+        }
+    }
+
+    void branchCommit(@Nonnull Iterable<Revision> revisions) {
+        String branchCommits = (String) get(BRANCH_COMMITS);
+        if (branchCommits == null) {
+            branchCommits = "";
+        }
+        for (Revision r : revisions) {
+            if (branchCommits.length() > 0) {
+                branchCommits += ",";
+            }
+            branchCommits += asId(r.asBranchRevision());
+        }
+        put(BRANCH_COMMITS, branchCommits);
+    }
+
+    String getChanges(String path) {
+        TreeNode node = getNode(path);
+        if (node == null) {
+            return "";
+        }
+        return getChanges(node);
+    }
+
+    UpdateOp asUpdateOp(@Nonnull Revision revision) {
+        String id = asId(revision);
+        UpdateOp op = new UpdateOp(id, true);
+        op.set(ID, id);
+        op.set(CHANGES, getChanges().serialize());
+        String bc = (String) get(BRANCH_COMMITS);
+        if (bc != null) {
+            op.set(BRANCH_COMMITS, bc);
+        }
+        return op;
+    }
+    
+    void addTo(final StringSort sort) throws IOException {
+        TreeNode n = getChanges();
+        TraversingVisitor v = new TraversingVisitor() {
+            
+            @Override
+            public void node(TreeNode node, String path) throws IOException {
+                sort.add(path);
+            }
+        };
+		n.accept(v, "/");
+        for (JournalEntry e : getBranchCommits()) {
+            e.getChanges().accept(v, "/");
+        }
+    }
+
+    /**
+     * Returns the branch commits that are related to this journal entry.
+     *
+     * @return the branch commits.
+     */
+    @Nonnull
+    Iterable<JournalEntry> getBranchCommits() {
+        List<JournalEntry> commits = Lists.newArrayList();
+        String bc = (String) get(BRANCH_COMMITS);
+        if (bc != null) {
+            for (String id : bc.split(",")) {
+                JournalEntry d = store.find(JOURNAL, id);
+                if (d == null) {
+                    throw new IllegalStateException(
+                            "Missing external change for branch revision: " + id);
+                }
+                //TODO: could this also be a problem with very large number of branches ???
+                commits.add(d);
+            }
+        }
+        return commits;
+    }
+
+    //-----------------------------< internal >---------------------------------
+
+    private static String getChanges(TreeNode node) {
+        JsopBuilder builder = new JsopBuilder();
+        for (String name : node.keySet()) {
+            builder.tag('^');
+            builder.key(name);
+            builder.object().endObject();
+        }
+        return builder.toString();
+    }
+
+    static String asId(@Nonnull Revision revision) {
+        checkNotNull(revision);
+        String s = String.format(REVISION_FORMAT, revision.getClusterId(), revision.getTimestamp(), revision.getCounter());
+        if (revision.isBranch()) {
+            s = "b" + s;
+        }
+        return s;
+    }
+
+    @CheckForNull
+    private TreeNode getNode(String path) {
+        TreeNode node = getChanges();
+        for (String name : PathUtils.elements(path)) {
+            node = node.get(name);
+            if (node == null) {
+                return null;
+            }
+        }
+        return node;
+    }
+
+    @Nonnull
+    private TreeNode getChanges() {
+        if (changes == null) {
+            TreeNode node = new TreeNode(null, "");
+            String c = (String) get(CHANGES);
+            if (c != null) {
+                node.parse(new JsopTokenizer(c));
+            }
+            changes = node;
+        }
+        return changes;
+    }
+
+    private static final class TreeNode {
+
+        private final Map<String, TreeNode> children = Maps.newHashMap();
+
+        private final String path;
+        private final TreeNode parent;
+        
+        TreeNode(TreeNode parent, String name) {
+            if (name.contains("/")) {
+                throw new IllegalArgumentException("name must not contain /: "+name);
+            }
+            this.parent = parent;
+            if (parent==null) {
+                this.path = "/";
+            } else if (parent.parent==null) {
+                this.path = "/" + name;
+            } else {
+                this.path = parent.path + "/" + name;
+            }
+        }
+        
+        public TreeNode getOrCreatePath(String path) {
+            if (path.equals(this.path)) {
+                // then path denotes the same as myself, hence return myself
+                return this;
+            }
+            if (!path.startsWith(this.path)) {
+                // this must never happen
+                throw new IllegalStateException("path not child of myself. path: "+path+", myself: "+this.path);
+            }
+            String sub = this.path.equals("/") ? path.substring(1) : path.substring(this.path.length()+1);
+            String[] parts = sub.split("/");
+            TreeNode n = this;
+            for (int i = 0; i < parts.length; i++) {
+                if (parts[i]!=null && parts[i].length()>0) {
+                    n = n.getOrCreate(parts[i]);
+                }
+            }
+            return n;
+        }
+
+        public boolean isParentOf(String path) {
+            if (this.path.equals("/")) {
+                // root is parent of everything
+                return true;
+            }
+            if (!path.startsWith(this.path+"/")) {
+                // then I'm not parent of that path
+                return false;
+            }
+            final String sub = path.substring(this.path.length()+1);
+            if (sub.indexOf("/", 1)!=-1) {
+                // if the 'sub' part contains a / then 
+                // it is not a direct child of myself,
+                // so I'm a grand-parent but not a direct-parent
+                return false;
+            }
+            return true;
+        }
+
+        private String getPath() {
+            return path;
+        }
+
+        void parse(JsopReader reader) {
+            reader.read('{');
+            if (!reader.matches('}')) {
+                do {
+                    String name = Utils.unescapePropertyName(reader.readString());
+                    reader.read(':');
+                    getOrCreate(name).parse(reader);
+                } while (reader.matches(','));
+                reader.read('}');
+            }
+        }
+
+        String serialize() {
+            JsopBuilder builder = new JsopBuilder();
+            builder.object();
+            toJson(builder);
+            builder.endObject();
+            return builder.toString();
+        }
+
+        @Nonnull
+        Set<String> keySet() {
+            return children.keySet();
+        }
+
+        @CheckForNull
+        TreeNode get(String name) {
+            return children.get(name);
+        }
+
+        void accept(TraversingVisitor visitor, String path) throws IOException {
+            visitor.node(this, path);
+            for (Map.Entry<String, TreeNode> entry : children.entrySet()) {
+                entry.getValue().accept(visitor, concat(path, entry.getKey()));
+            }
+        }
+
+        private void toJson(JsopBuilder builder) {
+            for (Map.Entry<String, TreeNode> entry : children.entrySet()) {
+                builder.key(Utils.escapePropertyName(entry.getKey()));
+                builder.object();
+                entry.getValue().toJson(builder);
+                builder.endObject();
+            }
+        }
+
+        private TreeNode getOrCreate(String name) {
+            TreeNode c = children.get(name);
+            if (c == null) {
+                c = new TreeNode(this, name);
+                children.put(name, c);
+            }
+            return c;
+        }
+    }
+
+    private interface TraversingVisitor {
+
+        void node(TreeNode node, String path) throws IOException;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java?rev=1684820&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java Thu Jun 11 07:07:27 2015
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * The JournalGarbageCollector can clean up JournalEntries that are
+ * older than a particular age.
+ * <p>
+ * It would typically be invoked in conjunction with the VersionGarbageCollector
+ * but must not be confused with that one - 'journal' refers to the separate
+ * collection that contains changed paths per background writes used for 
+ * observation.
+ */
+public class JournalGarbageCollector {
+
+	//copied from VersionGarbageCollector:
+    private static final int DELETE_BATCH_SIZE = 450;
+    
+    private final DocumentStore ds;
+
+    private static final Logger log = LoggerFactory.getLogger(JournalGarbageCollector.class);
+
+    public JournalGarbageCollector(DocumentNodeStore nodeStore) {
+        this.ds = nodeStore.getDocumentStore();
+    }
+
+    /**
+     * Deletes entries in the journal that are older than the given maxRevisionAge.
+     * @param maxRevisionAge entries older than this age will be removed
+     * @param unit the timeunit for maxRevisionAge
+     * @return the number of entries that have been removed
+     */
+    public int gc(long maxRevisionAge, TimeUnit unit) {
+        long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge);
+        if (log.isDebugEnabled()) {
+            log.debug("gc: Journal garbage collection starts with maxAge: {} min.", TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis));
+        }
+        Stopwatch sw = Stopwatch.createStarted();
+        
+        // the journal has ids of the following format:
+        // 1-0000014db9aaf710-00000001
+        // whereas the first number is the cluster node id.
+        // now, this format prevents from doing a generic
+        // query to get all 'old' entries, as the documentstore
+        // can only query for a sequential list of entries.
+        // (and the cluster node id here partitions the set
+        // of entries that we have to delete)
+        // To account for that, we simply iterate over all 
+        // cluster node ids and clean them up individually.
+        // Note that there are possible alternatives, such
+        // as: let each node clean up its own old entries
+        // but the chosen path is also quite simple: it can
+        // be started on any instance - but best on only one.
+        // if it's run on multiple concurrently, then they
+        // will compete at deletion, which is not optimal
+        // due to performance, but does not harm.
+        
+        // 1. get the list of cluster node ids
+        final List<ClusterNodeInfoDocument> clusterNodeInfos = ClusterNodeInfoDocument.all(ds);
+        int numDeleted = 0;
+        for (Iterator<ClusterNodeInfoDocument> it = clusterNodeInfos.iterator(); it
+				.hasNext();) {
+        	// current algorithm is to simply look at all cluster nodes
+        	// irrespective of whether they are active or inactive etc.
+        	// this could be optimized for inactive ones: at some point, all
+        	// journal entries of inactive ones would have been cleaned up
+        	// and at that point we could stop including those long-time-inactive ones.
+        	// that 'long time' aspect would have to be tracked though, to be sure
+        	// we don't leave garbage.
+        	// so simpler is to quickly do a query even for long-time inactive ones
+			final ClusterNodeInfoDocument clusterNodeInfoDocument = it.next();
+			final int clusterNodeId = clusterNodeInfoDocument.getClusterId();
+			
+			// 2. iterate over that list and do a query with
+			//    a limit of 'batch size'
+			boolean branch = false;
+			long startPointer = 0;
+			while(true) {
+				String fromKey = JournalEntry.asId(new Revision(startPointer, 0, clusterNodeId, branch));
+				String toKey = JournalEntry.asId(new Revision(
+						System.currentTimeMillis() - maxRevisionAgeInMillis, Integer.MAX_VALUE, clusterNodeId, branch));
+				int limit = DELETE_BATCH_SIZE;
+				List<JournalEntry> deletionBatch = ds.query(Collection.JOURNAL, fromKey, toKey, limit);
+				if (deletionBatch.size()>0) {
+					ds.remove(Collection.JOURNAL, asKeys(deletionBatch));
+					numDeleted+=deletionBatch.size();
+				}
+				if (deletionBatch.size()<limit) {
+					if (!branch) {
+						// do the same for branches:
+						// this will start at the beginning again with branch set to true
+						// and eventually finish too
+						startPointer = 0;
+						branch = true;
+						continue;
+					}
+					break;
+				}
+				startPointer = deletionBatch.get(deletionBatch.size()-1).getRevisionTimestamp();
+			}
+		}
+        
+        sw.stop();
+        
+        log.info("gc: Journal garbage collection took {}, deleted {} entries that were older than {} min.", sw, numDeleted, TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis));
+        return numDeleted;
+    }
+
+	private List<String> asKeys(List<JournalEntry> deletionBatch) {
+		final List<String> keys = new ArrayList<String>(deletionBatch.size());
+		for (JournalEntry e: deletionBatch) {
+			keys.add(e.getId());
+		}
+		return keys;
+	}
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Thu Jun 11 07:07:27 2015
@@ -22,6 +22,9 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.collect.ImmutableList.of;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.mergeSorted;
+import static java.util.Collections.singletonList;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
+import static org.apache.jackrabbit.oak.plugins.document.UnsavedModifications.Snapshot.IGNORE;
 
 import java.util.Iterator;
 import java.util.List;
@@ -137,6 +140,8 @@ public class LastRevRecoveryAgent {
 
         //Map of known last rev of checked paths
         Map<String, Revision> knownLastRevs = MapFactory.getInstance().create();
+		final DocumentStore docStore = nodeStore.getDocumentStore();
+        final JournalEntry changes = JOURNAL.newDocument(docStore);
 
         long count = 0;
         while (suspects.hasNext()) {
@@ -165,6 +170,7 @@ public class LastRevRecoveryAgent {
             //2. Update lastRev for parent paths aka rollup
             if (lastRevForParents != null) {
                 String path = doc.getPath();
+                changes.modified(path); // track all changes
                 while (true) {
                     if (PathUtils.denotesRoot(path)) {
                         break;
@@ -187,6 +193,9 @@ public class LastRevRecoveryAgent {
                 unsaved.put(parentPath, calcLastRev);
             }
         }
+        
+        // take the root's lastRev
+        final Revision lastRootRev = unsaved.get("/");
 
         //Note the size before persist as persist operation
         //would empty the internal state
@@ -200,7 +209,41 @@ public class LastRevRecoveryAgent {
             //UnsavedModifications is designed to be used in concurrent
             //access mode. For recovery case there is no concurrent access
             //involve so just pass a new lock instance
-            unsaved.persist(nodeStore, new ReentrantLock());
+        	
+    		// the lock uses to do the persisting is a plain reentrant lock
+    		// thus it doesn't matter, where exactly the check is done
+    		// as to whether the recovered lastRev has already been
+    		// written to the journal.
+            unsaved.persist(nodeStore, new UnsavedModifications.Snapshot() {
+
+                @Override
+                public void acquiring() {
+                    if (lastRootRev == null) {
+                        // this should never happen - when unsaved has no changes
+                        // that is reflected in the 'map' to be empty - in that
+                        // case 'persist()' quits early and never calls
+                        // acquiring() here.
+                        //
+                        // but even if it would occur - if we have no lastRootRev
+                        // then we cannot and probably don't have to persist anything
+                        return;
+                    }
+
+                    final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point
+                    final JournalEntry existingEntry = docStore.find(Collection.JOURNAL, id);
+                    if (existingEntry != null) {
+                        // then the journal entry was already written - as can happen if
+                        // someone else (or the original instance itself) wrote the
+                        // journal entry, then died.
+                        // in this case, don't write it again.
+                        // hence: nothing to be done here. return.
+                        return;
+                    }
+
+                    // otherwise store a new journal entry now
+                    docStore.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)));
+                }
+            }, new ReentrantLock());
 
             log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " +
                     "cluster node [{}]: {}", size, clusterId, updates);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java Thu Jun 11 07:07:27 2015
@@ -73,7 +73,8 @@ public class LocalDiffCache implements D
     @Nonnull
     @Override
     public Entry newEntry(final @Nonnull Revision from,
-                          final @Nonnull Revision to) {
+                          final @Nonnull Revision to,
+                          boolean local /*ignored*/) {
         return new Entry() {
             private final Map<String, String> changesPerPath = Maps.newHashMap();
             private int size;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java Thu Jun 11 07:07:27 2015
@@ -80,7 +80,8 @@ public class MemoryDiffCache implements
     @Nonnull
     @Override
     public Entry newEntry(@Nonnull Revision from,
-                          @Nonnull Revision to) {
+                          @Nonnull Revision to,
+                          boolean local /*ignored*/) {
         return new MemoryEntry(from, to);
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java Thu Jun 11 07:07:27 2015
@@ -16,8 +16,13 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.Set;
 import java.util.SortedSet;
 
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.Sets;
+
 /**
  * A merge commit containing multiple commit revisions. One for each branch
  * commit to merge.
@@ -25,6 +30,7 @@ import java.util.SortedSet;
 class MergeCommit extends Commit {
 
     private final SortedSet<Revision> mergeRevs;
+    private final Set<Revision> branchCommits = Sets.newHashSet();
 
     MergeCommit(DocumentNodeStore nodeStore,
                 Revision baseRevision,
@@ -37,8 +43,18 @@ class MergeCommit extends Commit {
         return mergeRevs;
     }
 
+    void addBranchCommits(@Nonnull Branch branch) {
+        for (Revision r : branch.getCommits()) {
+            if (!branch.getCommit(r).isRebase()) {
+                branchCommits.add(r);
+            }
+        }
+    }
+
     @Override
     public void applyToCache(Revision before, boolean isBranchCommit) {
-        // do nothing for a merge commit
+        // do nothing for a merge commit, only notify node
+        // store about merged revisions
+        nodeStore.revisionsMerged(branchCommits);
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java Thu Jun 11 07:07:27 2015
@@ -29,8 +29,8 @@ import org.apache.jackrabbit.oak.cache.C
  */
 class TieredDiffCache implements DiffCache {
 
-    private final LocalDiffCache localCache;
-    private final MemoryDiffCache memoryCache;
+    private final DiffCache localCache;
+    private final DiffCache memoryCache;
 
     TieredDiffCache(DocumentMK.Builder builder) {
         this.localCache = new LocalDiffCache(builder);
@@ -51,7 +51,8 @@ class TieredDiffCache implements DiffCac
     }
 
     /**
-     * Creates a new entry in the {@link LocalDiffCache} only!
+     * Creates a new entry in the {@link LocalDiffCache} for local changes
+     * and {@link MemoryDiffCache} for external changes
      *
      * @param from the from revision.
      * @param to the to revision.
@@ -59,8 +60,12 @@ class TieredDiffCache implements DiffCac
      */
     @Nonnull
     @Override
-    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) {
-        return localCache.newEntry(from, to);
+    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) {
+    	if (local) {
+    		return localCache.newEntry(from, to, true);
+    	} else {
+    		return memoryCache.newEntry(from, to, false);
+    	}
     }
 
     @Nonnull

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Thu Jun 11 07:07:27 2015
@@ -134,11 +134,14 @@ class UnsavedModifications {
      * lock for a short period of time.
      *
      * @param store the document node store.
+     * @param snapshot callback when the snapshot of the pending changes is
+     *                 acquired.
      * @param lock the lock to acquire to get a consistent snapshot of the
      *             revisions to write back.
      * @return stats about the write operation.
      */
     public BackgroundWriteStats persist(@Nonnull DocumentNodeStore store,
+                                        @Nonnull Snapshot snapshot,
                                         @Nonnull Lock lock) {
         BackgroundWriteStats stats = new BackgroundWriteStats();
         if (map.size() == 0) {
@@ -150,12 +153,13 @@ class UnsavedModifications {
         Clock clock = store.getClock();
 
         long time = clock.getTime();
-                // get a copy of the map while holding the lock
+        // get a copy of the map while holding the lock
         lock.lock();
         stats.lock = clock.getTime() - time;
         time = clock.getTime();
         Map<String, Revision> pending;
         try {
+            snapshot.acquiring();
             pending = Maps.newTreeMap(PathComparator.INSTANCE);
             pending.putAll(map);
         } finally {
@@ -218,4 +222,15 @@ class UnsavedModifications {
     public String toString() {
         return map.toString();
     }
+
+    public interface Snapshot {
+
+        Snapshot IGNORE = new Snapshot() {
+            @Override
+            public void acquiring() {
+            }
+        };
+
+        void acquiring();
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Thu Jun 11 07:07:27 2015
@@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Document;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.JournalEntry;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
@@ -73,6 +74,12 @@ public class MemoryDocumentStore impleme
     private ConcurrentSkipListMap<String, Document> settings =
             new ConcurrentSkipListMap<String, Document>();
 
+    /**
+     * The 'externalChanges' collection.
+     */
+    private ConcurrentSkipListMap<String, JournalEntry> externalChanges =
+            new ConcurrentSkipListMap<String, JournalEntry>();
+
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     /**
@@ -226,8 +233,10 @@ public class MemoryDocumentStore impleme
             return (ConcurrentSkipListMap<String, T>) nodes;
         } else if (collection == Collection.CLUSTER_NODES) {
             return (ConcurrentSkipListMap<String, T>) clusterNodes;
-        }else if (collection == Collection.SETTINGS) {
+        } else if (collection == Collection.SETTINGS) {
             return (ConcurrentSkipListMap<String, T>) settings;
+        } else if (collection == Collection.JOURNAL) {
+            return (ConcurrentSkipListMap<String, T>) externalChanges;
         } else {
             throw new IllegalArgumentException(
                     "Unknown collection: " + collection.toString());

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java Thu Jun 11 07:07:27 2015
@@ -91,7 +91,7 @@ public class MongoDiffCache extends Memo
             if (changes == null && loader != null) {
                 changes = loader.call();
                 // put into memory cache
-                super.newEntry(from, to).append(path, changes);
+                super.newEntry(from, to, false).append(path, changes);
             }
             return changes;
         } finally {
@@ -102,7 +102,8 @@ public class MongoDiffCache extends Memo
     @Nonnull
     @Override
     public Entry newEntry(@Nonnull final Revision from,
-                          @Nonnull final Revision to) {
+                          @Nonnull final Revision to,
+                          boolean local /*ignored*/) {
         return new MemoryEntry(from, to) {
 
             private Diff commit = new Diff(from, to);
@@ -172,7 +173,7 @@ public class MongoDiffCache extends Memo
                 // diff is complete
                 LOG.debug("Built diff from {} commits", numCommits);
                 // apply to diff cache and serve later requests from cache
-                d.applyToEntry(super.newEntry(from, to)).done();
+                d.applyToEntry(super.newEntry(from, to, false)).done();
                 // return changes
                 return d.getChanges(path);
             }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Thu Jun 11 07:07:27 2015
@@ -116,6 +116,7 @@ public class MongoDocumentStore implemen
     private final DBCollection nodes;
     private final DBCollection clusterNodes;
     private final DBCollection settings;
+    private final DBCollection journal;
 
     private final Cache<CacheValue, NodeDocument> nodesCache;
     private final CacheStats cacheStats;
@@ -184,12 +185,10 @@ public class MongoDocumentStore implemen
                 .put("version", version)
                 .build();
 
-        nodes = db.getCollection(
-                Collection.NODES.toString());
-        clusterNodes = db.getCollection(
-                Collection.CLUSTER_NODES.toString());
-        settings = db.getCollection(
-                Collection.SETTINGS.toString());
+        nodes = db.getCollection(Collection.NODES.toString());
+        clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString());
+        settings = db.getCollection(Collection.SETTINGS.toString());
+        journal = db.getCollection(Collection.JOURNAL.toString());
 
         maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
 
@@ -353,31 +352,30 @@ public class MongoDocumentStore implemen
         try {
             TreeLock lock = acquire(key);
             try {
-                if (maxCacheAge == 0) {
-                    invalidateCache(collection, key);
-                }
-                while (true) {
-                    doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() {
-                        @Override
-                        public NodeDocument call() throws Exception {
-                            NodeDocument doc = (NodeDocument) findUncachedWithRetry(
-                                    collection, key,
-                                    getReadPreference(maxCacheAge), 2);
-                            if (doc == null) {
-                                doc = NodeDocument.NULL;
+                if (maxCacheAge > 0 || preferCached) {
+                    // try again some other thread may have populated
+                    // the cache by now
+                    doc = nodesCache.getIfPresent(cacheKey);
+                    if (doc != null) {
+                        if (preferCached ||
+                                getTime() - doc.getCreated() < maxCacheAge) {
+                            if (doc == NodeDocument.NULL) {
+                                return null;
                             }
-                            return doc;
+                            return (T) doc;
                         }
-                    });
-                    if (maxCacheAge == 0 || preferCached) {
-                        break;
                     }
-                    if (getTime() - doc.getCreated() < maxCacheAge) {
-                        break;
-                    }
-                    // too old: invalidate, try again
-                    invalidateCache(collection, key);
                 }
+                final NodeDocument d = (NodeDocument) findUncachedWithRetry(
+                        collection, key,
+                        getReadPreference(maxCacheAge), 2);
+                invalidateCache(collection, key);
+                doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() {
+                    @Override
+                    public NodeDocument call() throws Exception {
+                        return d == null ? NodeDocument.NULL : d;
+                    }
+                });
             } finally {
                 lock.unlock();
             }
@@ -390,6 +388,8 @@ public class MongoDocumentStore implemen
             t = e.getCause();
         } catch (ExecutionException e) {
             t = e.getCause();
+        } catch (RuntimeException e) {
+            t = e;
         }
         throw new DocumentStoreException("Failed to load document with " + key, t);
     }
@@ -411,6 +411,9 @@ public class MongoDocumentStore implemen
             DocumentReadPreference docReadPref,
             int retries) {
         checkArgument(retries >= 0, "retries must not be negative");
+        if (key.equals("0:/")) {
+            LOG.trace("root node");
+        }
         int numAttempts = retries + 1;
         MongoException ex = null;
         for (int i = 0; i < numAttempts; i++) {
@@ -518,8 +521,12 @@ public class MongoDocumentStore implemen
         }
         DBObject query = queryBuilder.get();
         String parentId = Utils.getParentIdFromLowerLimit(fromKey);
-        TreeLock lock = acquireExclusive(parentId != null ? parentId : "");
+        long lockTime = -1;
         final long start = PERFLOG.start();
+        TreeLock lock = acquireExclusive(parentId != null ? parentId : "");
+        if (start != -1) {
+            lockTime = System.currentTimeMillis() - start;
+        }
         try {
             DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC);
             if (!disableIndexHint) {
@@ -574,7 +581,7 @@ public class MongoDocumentStore implemen
             return list;
         } finally {
             lock.unlock();
-            PERFLOG.end(start, 1, "query for children from [{}] to [{}]", fromKey, toKey);
+            PERFLOG.end(start, 1, "query for children from [{}] to [{}], lock:{}", fromKey, toKey, lockTime);
         }
     }
 
@@ -968,7 +975,9 @@ public class MongoDocumentStore implemen
             return clusterNodes;
         } else if (collection == Collection.SETTINGS) {
             return settings;
-        }else {
+        } else if (collection == Collection.JOURNAL) {
+            return journal;
+        } else {
             throw new IllegalArgumentException(
                     "Unknown collection: " + collection.toString());
         }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Thu Jun 11 07:07:27 2015
@@ -754,7 +754,7 @@ public class RDBDocumentStore implements
     private Set<String> tablesToBeDropped = new HashSet<String>();
 
     // table names
-    private String tnNodes, tnClusterNodes, tnSettings; 
+    private String tnNodes, tnClusterNodes, tnSettings, tnJournal;
 
     // ratio between Java characters and UTF-8 encoding
     // a) single characters will fit into 3 bytes
@@ -793,6 +793,7 @@ public class RDBDocumentStore implements
         this.tnNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), "NODES");
         this.tnClusterNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), "CLUSTERNODES");
         this.tnSettings = RDBJDBCTools.createTableName(options.getTablePrefix(), "SETTINGS");
+        this.tnJournal = RDBJDBCTools.createTableName(options.getTablePrefix(), "JOURNAL");
 
         this.ch = new RDBConnectionHandler(ds);
         this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBDocumentStore creation") : null;
@@ -835,6 +836,7 @@ public class RDBDocumentStore implements
             createTableFor(con, Collection.CLUSTER_NODES, tablesCreated, tablesPresent);
             createTableFor(con, Collection.NODES, tablesCreated, tablesPresent);
             createTableFor(con, Collection.SETTINGS, tablesCreated, tablesPresent);
+            createTableFor(con, Collection.JOURNAL, tablesCreated, tablesPresent);
         } finally {
             con.commit();
             con.close();
@@ -1239,6 +1241,8 @@ public class RDBDocumentStore implements
             return this.tnNodes;
         } else if (collection == Collection.SETTINGS) {
             return this.tnSettings;
+        } else if (collection == Collection.JOURNAL) {
+            return this.tnJournal;
         } else {
             throw new IllegalArgumentException("Unknown collection: " + collection.toString());
         }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java Thu Jun 11 07:07:27 2015
@@ -47,7 +47,7 @@ class AmnesiaDiffCache implements DiffCa
 
     @Nonnull
     @Override
-    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) {
+    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) {
         return new Entry() {
             @Override
             public void append(@Nonnull String path, @Nonnull String changes) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java?rev=1684820&r1=1684819&r2=1684820&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java Thu Jun 11 07:07:27 2015
@@ -369,6 +369,10 @@ public class ClusterTest {
                 rootStates2.add((DocumentNodeState) root);
             }
         });
+
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+
         rootStates1.clear();
         rootStates2.clear();