You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/08/11 14:55:42 UTC

svn commit: r1695297 [2/3] - in /jackrabbit/oak/branches/1.0: ./ oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugi...

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Tue Aug 11 12:55:41 2015
@@ -22,6 +22,8 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.collect.ImmutableList.of;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.mergeSorted;
+import static java.util.Collections.singletonList;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -126,6 +128,8 @@ public class LastRevRecoveryAgent {
             //Map of known last rev of checked paths
             UnsavedModifications knownLastRevs = new UnsavedModifications();
             closer.register(knownLastRevs);
+            final DocumentStore docStore = nodeStore.getDocumentStore();
+            final JournalEntry changes = JOURNAL.newDocument(docStore);
 
             while (suspects.hasNext()) {
                 NodeDocument doc = suspects.next();
@@ -153,6 +157,7 @@ public class LastRevRecoveryAgent {
                 //2. Update lastRev for parent paths aka rollup
                 if (lastRevForParents != null) {
                     String path = doc.getPath();
+                    changes.modified(path); // track all changes
                     while (true) {
                         if (PathUtils.denotesRoot(path)) {
                             break;
@@ -176,6 +181,9 @@ public class LastRevRecoveryAgent {
                 }
             }
 
+            // take the root's lastRev
+            final Revision lastRootRev = unsaved.get("/");
+
             //Note the size before persist as persist operation
             //would empty the internal state
             int size = unsaved.getPaths().size();
@@ -184,7 +192,41 @@ public class LastRevRecoveryAgent {
             //UnsavedModifications is designed to be used in concurrent
             //access mode. For recovery case there is no concurrent access
             //involve so just pass a new lock instance
-            unsaved.persist(nodeStore, new ReentrantLock());
+
+            // the lock uses to do the persisting is a plain reentrant lock
+            // thus it doesn't matter, where exactly the check is done
+            // as to whether the recovered lastRev has already been
+            // written to the journal.
+            unsaved.persist(nodeStore, new UnsavedModifications.Snapshot() {
+
+                @Override
+                public void acquiring() {
+                    if (lastRootRev == null) {
+                        // this should never happen - when unsaved has no changes
+                        // that is reflected in the 'map' to be empty - in that
+                        // case 'persist()' quits early and never calls
+                        // acquiring() here.
+                        //
+                        // but even if it would occur - if we have no lastRootRev
+                        // then we cannot and probably don't have to persist anything
+                        return;
+                    }
+
+                    final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point
+                    final JournalEntry existingEntry = docStore.find(Collection.JOURNAL, id);
+                    if (existingEntry != null) {
+                        // then the journal entry was already written - as can happen if
+                        // someone else (or the original instance itself) wrote the
+                        // journal entry, then died.
+                        // in this case, don't write it again.
+                        // hence: nothing to be done here. return.
+                        return;
+                    }
+
+                    // otherwise store a new journal entry now
+                    docStore.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)));
+                }
+            }, new ReentrantLock());
 
             log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " +
                     "cluster node [{}]: {}", size, clusterId, updates);

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java Tue Aug 11 12:55:41 2015
@@ -73,7 +73,8 @@ public class LocalDiffCache implements D
     @Nonnull
     @Override
     public Entry newEntry(final @Nonnull Revision from,
-                          final @Nonnull Revision to) {
+                          final @Nonnull Revision to,
+                          boolean local /*ignored*/) {
         return new Entry() {
             private final Map<String, String> changesPerPath = Maps.newHashMap();
             private int size;

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java Tue Aug 11 12:55:41 2015
@@ -80,7 +80,8 @@ public class MemoryDiffCache implements
     @Nonnull
     @Override
     public Entry newEntry(@Nonnull Revision from,
-                          @Nonnull Revision to) {
+                          @Nonnull Revision to,
+                          boolean local /*ignored*/) {
         return new MemoryEntry(from, to);
     }
 

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java Tue Aug 11 12:55:41 2015
@@ -16,8 +16,13 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.Set;
 import java.util.SortedSet;
 
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.Sets;
+
 /**
  * A merge commit containing multiple commit revisions. One for each branch
  * commit to merge.
@@ -25,6 +30,7 @@ import java.util.SortedSet;
 class MergeCommit extends Commit {
 
     private final SortedSet<Revision> mergeRevs;
+    private final Set<Revision> branchCommits = Sets.newHashSet();
 
     MergeCommit(DocumentNodeStore nodeStore,
                 Revision baseRevision,
@@ -37,8 +43,18 @@ class MergeCommit extends Commit {
         return mergeRevs;
     }
 
+    void addBranchCommits(@Nonnull Branch branch) {
+        for (Revision r : branch.getCommits()) {
+            if (!branch.getCommit(r).isRebase()) {
+                branchCommits.add(r);
+            }
+        }
+    }
+
     @Override
     public void applyToCache(Revision before, boolean isBranchCommit) {
-        // do nothing for a merge commit
+        // do nothing for a merge commit, only notify node
+        // store about merged revisions
+        nodeStore.revisionsMerged(branchCommits);
     }
 }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java Tue Aug 11 12:55:41 2015
@@ -29,8 +29,8 @@ import org.apache.jackrabbit.oak.cache.C
  */
 class TieredDiffCache implements DiffCache {
 
-    private final LocalDiffCache localCache;
-    private final MemoryDiffCache memoryCache;
+    private final DiffCache localCache;
+    private final DiffCache memoryCache;
 
     TieredDiffCache(DocumentMK.Builder builder) {
         this.localCache = new LocalDiffCache(builder);
@@ -51,7 +51,8 @@ class TieredDiffCache implements DiffCac
     }
 
     /**
-     * Creates a new entry in the {@link LocalDiffCache} only!
+     * Creates a new entry in the {@link LocalDiffCache} for local changes
+     * and {@link MemoryDiffCache} for external changes
      *
      * @param from the from revision.
      * @param to the to revision.
@@ -59,8 +60,12 @@ class TieredDiffCache implements DiffCac
      */
     @Nonnull
     @Override
-    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) {
-        return localCache.newEntry(from, to);
+    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) {
+        if (local) {
+            return localCache.newEntry(from, to, true);
+        } else {
+            return memoryCache.newEntry(from, to, false);
+        }
     }
 
     @Nonnull

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Tue Aug 11 12:55:41 2015
@@ -162,11 +162,14 @@ class UnsavedModifications implements Cl
      * lock for a short period of time.
      *
      * @param store the document node store.
+     * @param snapshot callback when the snapshot of the pending changes is
+     *                 acquired.
      * @param lock the lock to acquire to get a consistent snapshot of the
      *             revisions to write back.
      * @return stats about the write operation.
      */
     public BackgroundWriteStats persist(@Nonnull DocumentNodeStore store,
+                                        @Nonnull Snapshot snapshot,
                                         @Nonnull Lock lock) {
         BackgroundWriteStats stats = new BackgroundWriteStats();
         if (map.size() == 0) {
@@ -178,13 +181,14 @@ class UnsavedModifications implements Cl
         Clock clock = store.getClock();
 
         long time = clock.getTime();
-                // get a copy of the map while holding the lock
+        // get a copy of the map while holding the lock
         lock.lock();
         MapFactory tmpFactory = null;
         Map<String, Revision> pending;
         try {
             stats.lock = clock.getTime() - time;
             time = clock.getTime();
+            snapshot.acquiring();
             if (map.size() > IN_MEMORY_SIZE_LIMIT) {
                 tmpFactory = MapFactory.createFactory();
                 pending = tmpFactory.create(PathComparator.INSTANCE);
@@ -265,4 +269,15 @@ class UnsavedModifications implements Cl
     public String toString() {
         return map.toString();
     }
+
+    public interface Snapshot {
+
+        Snapshot IGNORE = new Snapshot() {
+            @Override
+            public void acquiring() {
+            }
+        };
+
+        void acquiring();
+    }
 }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Tue Aug 11 12:55:41 2015
@@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Document;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.JournalEntry;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
@@ -73,6 +74,12 @@ public class MemoryDocumentStore impleme
     private ConcurrentSkipListMap<String, Document> settings =
             new ConcurrentSkipListMap<String, Document>();
 
+    /**
+     * The 'externalChanges' collection.
+     */
+    private ConcurrentSkipListMap<String, JournalEntry> externalChanges =
+            new ConcurrentSkipListMap<String, JournalEntry>();
+
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     /**
@@ -226,8 +233,10 @@ public class MemoryDocumentStore impleme
             return (ConcurrentSkipListMap<String, T>) nodes;
         } else if (collection == Collection.CLUSTER_NODES) {
             return (ConcurrentSkipListMap<String, T>) clusterNodes;
-        }else if (collection == Collection.SETTINGS) {
+        } else if (collection == Collection.SETTINGS) {
             return (ConcurrentSkipListMap<String, T>) settings;
+        } else if (collection == Collection.JOURNAL) {
+            return (ConcurrentSkipListMap<String, T>) externalChanges;
         } else {
             throw new IllegalArgumentException(
                     "Unknown collection: " + collection.toString());
@@ -329,6 +338,11 @@ public class MemoryDocumentStore impleme
     }
 
     @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return null;
+    }
+    
+    @Override
     public void dispose() {
         // ignore
     }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java Tue Aug 11 12:55:41 2015
@@ -195,7 +195,7 @@ abstract class CacheInvalidator {
             PeekingIterator<TreeNode> pitr = Iterators.peekingIterator(treeItr);
             Map<String, TreeNode> sameLevelNodes = Maps.newHashMap();
 
-            // Fetch only the lastRev map and id
+            // Fetch only the modCount and id
             final BasicDBObject keys = new BasicDBObject(Document.ID, 1);
             keys.put(Document.MOD_COUNT, 1);
 
@@ -228,7 +228,7 @@ abstract class CacheInvalidator {
                         QueryBuilder query = QueryBuilder.start(Document.ID)
                                 .in(idBatch);
 
-                        // Fetch lastRev and modCount for each such nodes
+                        // Fetch modCount for each such nodes
                         DBCursor cursor = nodes.find(query.get(), keys);
                         cursor.setReadPreference(ReadPreference.primary());
                         LOG.debug(

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java Tue Aug 11 12:55:41 2015
@@ -91,7 +91,7 @@ public class MongoDiffCache extends Memo
             if (changes == null && loader != null) {
                 changes = loader.call();
                 // put into memory cache
-                super.newEntry(from, to).append(path, changes);
+                super.newEntry(from, to, false).append(path, changes);
             }
             return changes;
         } finally {
@@ -102,7 +102,8 @@ public class MongoDiffCache extends Memo
     @Nonnull
     @Override
     public Entry newEntry(@Nonnull final Revision from,
-                          @Nonnull final Revision to) {
+                          @Nonnull final Revision to,
+                          boolean local /*ignored*/) {
         return new MemoryEntry(from, to) {
 
             private Diff commit = new Diff(from, to);
@@ -172,7 +173,7 @@ public class MongoDiffCache extends Memo
                 // diff is complete
                 LOG.debug("Built diff from {} commits", numCommits);
                 // apply to diff cache and serve later requests from cache
-                d.applyToEntry(super.newEntry(from, to)).done();
+                d.applyToEntry(super.newEntry(from, to, false)).done();
                 // return changes
                 return d.getChanges(path);
             }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Tue Aug 11 12:55:41 2015
@@ -66,6 +66,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.cache.ForwardingListener;
 import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache;
 import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache;
+import org.apache.jackrabbit.oak.plugins.document.mongo.CacheInvalidator.InvalidationResult;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.stats.Clock;
@@ -115,6 +116,7 @@ public class MongoDocumentStore implemen
     private final DBCollection nodes;
     private final DBCollection clusterNodes;
     private final DBCollection settings;
+    private final DBCollection journal;
 
     private final Cache<CacheValue, NodeDocument> nodesCache;
     private final CacheStats cacheStats;
@@ -192,12 +194,10 @@ public class MongoDocumentStore implemen
                 .put("version", version)
                 .build();
 
-        nodes = db.getCollection(
-                Collection.NODES.toString());
-        clusterNodes = db.getCollection(
-                Collection.CLUSTER_NODES.toString());
-        settings = db.getCollection(
-                Collection.SETTINGS.toString());
+        nodes = db.getCollection(Collection.NODES.toString());
+        clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString());
+        settings = db.getCollection(Collection.SETTINGS.toString());
+        journal = db.getCollection(Collection.JOURNAL.toString());
 
         maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
 
@@ -295,6 +295,59 @@ public class MongoDocumentStore implemen
         //that would lead to lesser number of queries
         return CacheInvalidator.createHierarchicalInvalidator(this).invalidateCache();
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        LOG.debug("invalidateCache: start");
+        final InvalidationResult result = new InvalidationResult();
+        int size  = 0;
+
+        final Iterator<String> it = keys.iterator();
+        while(it.hasNext()) {
+            // read chunks of documents only
+            final List<String> ids = new ArrayList<String>(IN_CLAUSE_BATCH_SIZE);
+            while(it.hasNext() && ids.size() < IN_CLAUSE_BATCH_SIZE) {
+                final String id = it.next();
+                if (getCachedNodeDoc(id) != null) {
+                    // only add those that we actually do have cached
+                    ids.add(id);
+                }
+            }
+            size += ids.size();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("invalidateCache: batch size: {} of total so far {}",
+                        ids.size(), size);
+            }
+            
+            QueryBuilder query = QueryBuilder.start(Document.ID).in(ids);
+            // Fetch only the modCount and id
+            final BasicDBObject fields = new BasicDBObject(Document.ID, 1);
+            fields.put(Document.MOD_COUNT, 1);
+            
+            DBCursor cursor = nodes.find(query.get(), fields);
+            cursor.setReadPreference(ReadPreference.primary());
+            result.queryCount++;
+            
+            for (DBObject obj : cursor) {
+                result.cacheEntriesProcessedCount++;
+                String id = (String) obj.get(Document.ID);
+                Number modCount = (Number) obj.get(Document.MOD_COUNT);
+                
+                CachedNodeDocument cachedDoc = getCachedNodeDoc(id);
+                if (cachedDoc != null
+                        && !Objects.equal(cachedDoc.getModCount(), modCount)) {
+                    invalidateCache(Collection.NODES, id);
+                    result.invalidationCount++;
+                } else {
+                    result.upToDateCount++;
+                }
+            }
+        }
+
+        result.cacheSize = size;
+        LOG.trace("invalidateCache: end. total: {}", size);
+        return result;
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {
@@ -360,29 +413,30 @@ public class MongoDocumentStore implemen
         try {
             TreeLock lock = acquire(key);
             try {
-                if (maxCacheAge == 0) {
-                    invalidateCache(collection, key);
-                }
-                while (true) {
-                    doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() {
-                        @Override
-                        public NodeDocument call() throws Exception {
-                            NodeDocument doc = (NodeDocument) findUncached(collection, key, getReadPreference(maxCacheAge));
-                            if (doc == null) {
-                                doc = NodeDocument.NULL;
+                if (maxCacheAge > 0 || preferCached) {
+                    // try again some other thread may have populated
+                    // the cache by now
+                    doc = nodesCache.getIfPresent(cacheKey);
+                    if (doc != null) {
+                        if (preferCached ||
+                                getTime() - doc.getCreated() < maxCacheAge) {
+                            if (doc == NodeDocument.NULL) {
+                                return null;
                             }
-                            return doc;
+                            return (T) doc;
                         }
-                    });
-                    if (maxCacheAge == 0 || preferCached) {
-                        break;
-                    }
-                    if (getTime() - doc.getCreated() < maxCacheAge) {
-                        break;
                     }
-                    // too old: invalidate, try again
-                    invalidateCache(collection, key);
                 }
+                final NodeDocument d = (NodeDocument) findUncached(
+                        collection, key,
+                        getReadPreference(maxCacheAge));
+                invalidateCache(collection, key);
+                doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() {
+                    @Override
+                    public NodeDocument call() throws Exception {
+                        return d == null ? NodeDocument.NULL : d;
+                    }
+                });
             } finally {
                 lock.unlock();
             }
@@ -393,6 +447,8 @@ public class MongoDocumentStore implemen
             }
         } catch (ExecutionException e) {
             t = e.getCause();
+        } catch (RuntimeException e) {
+            t = e;
         }
         throw new DocumentStoreException("Failed to load document with " + key, t);
     }
@@ -514,9 +570,13 @@ public class MongoDocumentStore implemen
         }
         DBObject query = queryBuilder.get();
         String parentId = Utils.getParentIdFromLowerLimit(fromKey);
+        long lockTime = -1;
         final long start = PERFLOG.start();
-        TreeLock lock = withLock ? acquireExclusive(parentId != null ? parentId : "") : null;
+        TreeLock lock = acquireExclusive(parentId != null ? parentId : "");
         try {
+            if (start != -1) {
+                lockTime = System.currentTimeMillis() - start;
+            }
             DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC);
             if (!disableIndexHint) {
                 cursor.hint(hint);
@@ -574,7 +634,7 @@ public class MongoDocumentStore implemen
             if (lock != null) {
                 lock.unlock();
             }
-            PERFLOG.end(start, 1, "query for children from [{}] to [{}]", fromKey, toKey);
+            PERFLOG.end(start, 1, "query for children from [{}] to [{}], lock:{}", fromKey, toKey, lockTime);
         }
     }
 
@@ -968,7 +1028,9 @@ public class MongoDocumentStore implemen
             return clusterNodes;
         } else if (collection == Collection.SETTINGS) {
             return settings;
-        }else {
+        } else if (collection == Collection.JOURNAL) {
+            return journal;
+        } else {
             throw new IllegalArgumentException(
                     "Unknown collection: " + collection.toString());
         }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue Aug 11 12:55:41 2015
@@ -282,6 +282,12 @@ public class RDBDocumentStore implements
         }
         return null;
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        //TODO: optimize me
+        return invalidateCache();
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String id) {
@@ -783,7 +789,7 @@ public class RDBDocumentStore implements
     private Set<String> tablesToBeDropped = new HashSet<String>();
 
     // table names
-    private String tnNodes, tnClusterNodes, tnSettings; 
+    private String tnNodes, tnClusterNodes, tnSettings, tnJournal;
 
     // ratio between Java characters and UTF-8 encoding
     // a) single characters will fit into 3 bytes
@@ -825,6 +831,7 @@ public class RDBDocumentStore implements
         this.tnNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.NODES));
         this.tnClusterNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.CLUSTER_NODES));
         this.tnSettings = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.SETTINGS));
+        this.tnJournal = RDBJDBCTools.createTableName(options.getTablePrefix(), "JOURNAL");
 
         this.ch = new RDBConnectionHandler(ds);
         this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBDocumentStore creation") : null;
@@ -878,6 +885,7 @@ public class RDBDocumentStore implements
             createTableFor(con, Collection.CLUSTER_NODES, tablesCreated, tablesPresent, tableDiags);
             createTableFor(con, Collection.NODES, tablesCreated, tablesPresent, tableDiags);
             createTableFor(con, Collection.SETTINGS, tablesCreated, tablesPresent, tableDiags);
+            createTableFor(con, Collection.JOURNAL, tablesCreated, tablesPresent, tableDiags);
         } finally {
             con.commit();
             con.close();
@@ -1314,6 +1322,8 @@ public class RDBDocumentStore implements
             return this.tnNodes;
         } else if (collection == Collection.SETTINGS) {
             return this.tnSettings;
+        } else if (collection == Collection.JOURNAL) {
+            return this.tnJournal;
         } else {
             throw new IllegalArgumentException("Unknown collection: " + collection.toString());
         }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Tue Aug 11 12:55:41 2015
@@ -251,6 +251,17 @@ public class LoggingDocumentStoreWrapper
             throw convert(e);
         }
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        try {
+            logMethod("invalidateCache", keys);
+            return store.invalidateCache(keys);
+        } catch (Exception e) {
+            logException(e);
+            throw convert(e);
+        }
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Tue Aug 11 12:55:41 2015
@@ -107,6 +107,11 @@ public class SynchronizingDocumentStoreW
     }
 
     @Override
+    public synchronized CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return store.invalidateCache(keys);
+    }
+    
+    @Override
     public synchronized <T extends Document> void invalidateCache(Collection<T> collection, String key) {
         store.invalidateCache(collection, key);
     }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Tue Aug 11 12:55:41 2015
@@ -282,6 +282,18 @@ public class TimingDocumentStoreWrapper
             throw convert(e);
         }
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        try {
+            long start = now();
+            CacheInvalidationStats result = base.invalidateCache(keys);
+            updateAndLogTimes("invalidateCache3", start, 0, 0);
+            return result;
+        } catch (Exception e) {
+            throw convert(e);
+        }
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Tue Aug 11 12:55:41 2015
@@ -34,6 +34,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
 import com.google.common.collect.AbstractIterator;
 import com.mongodb.BasicDBObject;
 
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
 
 /**
  * Utility methods.
@@ -557,4 +559,31 @@ public class Utils {
     public static boolean isHiddenPath(@Nonnull String path) {
         return path.contains("/:");
     }
+
+    /**
+     * Transforms the given {@link Iterable} from {@link String} to
+     * {@link StringValue} elements. The {@link Iterable} must no have
+     * {@code null} values.
+     */
+    public static Iterable<StringValue> asStringValueIterable(
+            @Nonnull Iterable<String> values) {
+        return transform(values, new Function<String, StringValue>() {
+            @Override
+            public StringValue apply(String input) {
+                return new StringValue(input);
+            }
+        });
+    }
+
+    /**
+     * Transforms the given paths into ids using {@link #getIdFromPath(String)}.
+     */
+    public static Iterable<String> pathToId(@Nonnull Iterable<String> paths) {
+        return transform(paths, new Function<String, String>() {
+            @Override
+            public String apply(String input) {
+                return getIdFromPath(input);
+            }
+        });
+    }
 }

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java Tue Aug 11 12:55:41 2015
@@ -161,7 +161,7 @@ public abstract class NodeObserver imple
                 while (!generator.isDone()) {
                     generator.generate();
                 }
-                PERF_LOGGER.end(start, 10,
+                PERF_LOGGER.end(start, 100,
                         "Generated events (before: {}, after: {})",
                         previousRoot, root);
             } catch (Exception e) {

Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java (added)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for journal related tests.
+ */
+public abstract class AbstractJournalTest {
+
+    protected TestBuilder builder;
+    protected List<DocumentMK> mks = Lists.newArrayList();
+    protected Random random;
+
+    @Before
+    public void setup() {
+        random = new Random();
+    }
+
+    @Before
+    @After
+    public void clear() {
+        for (DocumentMK mk : mks) {
+            mk.dispose();
+        }
+        mks.clear();
+    }
+
+    protected static void invalidateDocChildrenCache(DocumentNodeStore store) {
+        store.invalidateDocChildrenCache();
+    }
+
+    protected static void renewClusterIdLease(DocumentNodeStore store) {
+        store.renewClusterIdLease();
+    }
+
+    protected Set<String> choose(List<String> paths, int howMany) {
+        final Set<String> result = new HashSet<String>();
+        while(result.size()<howMany) {
+            result.add(paths.get(random.nextInt(paths.size())));
+        }
+        return result;
+    }
+
+    protected List<String> createRandomPaths(int depth, int avgChildrenPerLevel, int num) {
+        final Set<String> result = new HashSet<String>();
+        while(result.size()<num) {
+            result.add(createRandomPath(depth, avgChildrenPerLevel));
+        }
+        return new ArrayList<String>(result);
+    }
+
+    protected String createRandomPath(int depth, int avgChildrenPerLevel) {
+        StringBuilder sb = new StringBuilder();
+        for(int i=0; i<depth; i++) {
+            sb.append("/");
+            sb.append("r").append(random.nextInt(avgChildrenPerLevel));
+        }
+        return sb.toString();
+    }
+
+    protected void assertDocCache(DocumentNodeStore ns, boolean expected, String path) {
+        String id = Utils.getIdFromPath(path);
+        boolean exists = ns.getDocumentStore().getIfCached(Collection.NODES, id)!=null;
+        if (exists!=expected) {
+            if (expected) {
+                fail("assertDocCache: did not find in cache even though expected: "+path);
+            } else {
+                fail("assertDocCache: found in cache even though not expected: "+path);
+            }
+        }
+    }
+
+    protected void setProperty(DocumentNodeStore ns, String path, String key, String value, boolean runBgOpsAfterCreation) throws
+            CommitFailedException {
+        NodeBuilder rootBuilder = ns.getRoot().builder();
+        doGetOrCreate(rootBuilder, path).setProperty(key, value);
+        ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        if (runBgOpsAfterCreation) {
+            ns.runBackgroundOperations();
+        }
+    }
+
+    protected void getOrCreate(DocumentNodeStore ns, List<String> paths, boolean runBgOpsAfterCreation) throws CommitFailedException {
+        NodeBuilder rootBuilder = ns.getRoot().builder();
+        for(String path:paths) {
+            doGetOrCreate(rootBuilder, path);
+        }
+        ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        if (runBgOpsAfterCreation) {
+            ns.runBackgroundOperations();
+        }
+    }
+
+    protected void getOrCreate(DocumentNodeStore ns, String path, boolean runBgOpsAfterCreation) throws CommitFailedException {
+        NodeBuilder rootBuilder = ns.getRoot().builder();
+        doGetOrCreate(rootBuilder, path);
+        ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        if (runBgOpsAfterCreation) {
+            ns.runBackgroundOperations();
+        }
+    }
+
+    protected NodeBuilder doGetOrCreate(NodeBuilder builder, String path) {
+        String[] parts = path.split("/");
+        for(int i=1; i<parts.length; i++) {
+            builder = builder.child(parts[i]);
+        }
+        return builder;
+    }
+
+    protected void assertJournalEntries(DocumentNodeStore ds, String... expectedChanges) {
+        List<String> exp = new LinkedList<String>(asList(expectedChanges));
+        for(boolean branch : new Boolean[]{false, true}) {
+            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
+            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
+            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, expectedChanges.length+5);
+            if (entries.size()>0) {
+                for (JournalEntry journalEntry : entries) {
+                    if (!exp.remove(journalEntry.get("_c"))) {
+                        fail("Found an unexpected change: " + journalEntry.get("_c") + ", while all I expected was: " + asList(expectedChanges));
+                    }
+                }
+            }
+        }
+        if (exp.size()>0) {
+            fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+asList(expectedChanges)+")");
+        }
+    }
+
+    protected int countJournalEntries(DocumentNodeStore ds, int max) {
+        int total = 0;
+        for(boolean branch : new Boolean[]{false, true}) {
+            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
+            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
+            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, max);
+            total+=entries.size();
+        }
+        return total;
+    }
+
+    protected NodeDocument getDocument(DocumentNodeStore nodeStore, String path) {
+        return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path));
+    }
+
+    protected TestBuilder newDocumentMKBuilder() {
+        return new TestBuilder();
+    }
+
+    protected DocumentMK createMK(int clusterId, int asyncDelay,
+                                  DocumentStore ds, BlobStore bs) {
+        builder = newDocumentMKBuilder();
+        return register(builder.setDocumentStore(ds)
+                .setBlobStore(bs).setClusterId(clusterId)
+                .setAsyncDelay(asyncDelay).open());
+    }
+
+    protected DocumentMK register(DocumentMK mk) {
+        mks.add(mk);
+        return mk;
+    }
+
+    protected final class TestBuilder extends DocumentMK.Builder {
+        CountingDocumentStore actualStore;
+        CountingTieredDiffCache actualDiffCache;
+
+        @Override
+        public DocumentStore getDocumentStore() {
+            if (actualStore==null) {
+                actualStore = new CountingDocumentStore(super.getDocumentStore());
+            }
+            return actualStore;
+        }
+
+        @Override
+        public DiffCache getDiffCache() {
+            if (actualDiffCache==null) {
+                actualDiffCache = new CountingTieredDiffCache(this);
+            }
+            return actualDiffCache;
+        }
+    }
+}

Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java Tue Aug 11 12:55:41 2015
@@ -47,7 +47,7 @@ class AmnesiaDiffCache implements DiffCa
 
     @Nonnull
     @Override
-    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) {
+    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) {
         return new Entry() {
             @Override
             public void append(@Nonnull String path, @Nonnull String changes) {

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java Tue Aug 11 12:55:41 2015
@@ -370,6 +370,10 @@ public class ClusterTest {
                 rootStates2.add((DocumentNodeState) root);
             }
         });
+
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+
         rootStates1.clear();
         rootStates2.clear();
 

Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (added)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
+
+public class CountingDocumentStore implements DocumentStore {
+
+    private DocumentStore delegate;
+
+    //TODO: remove mec
+    boolean printStacks;
+
+    class Stats {
+
+        private int numFindCalls;
+        private int numQueryCalls;
+        private int numRemoveCalls;
+        private int numCreateOrUpdateCalls;
+
+    }
+
+    private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>();
+
+    public CountingDocumentStore(DocumentStore delegate) {
+        this.delegate = delegate;
+    }
+
+    public void resetCounters() {
+        collectionStats.clear();
+    }
+
+    public int getNumFindCalls(Collection collection) {
+        return getStats(collection).numFindCalls;
+    }
+
+    public int getNumQueryCalls(Collection collection) {
+        return getStats(collection).numQueryCalls;
+    }
+
+    public int getNumRemoveCalls(Collection collection) {
+        return getStats(collection).numRemoveCalls;
+    }
+
+    public int getNumCreateOrUpdateCalls(Collection collection) {
+        return getStats(collection).numCreateOrUpdateCalls;
+    }
+
+    private Stats getStats(Collection collection) {
+        if (!collectionStats.containsKey(collection)) {
+            Stats s = new Stats();
+            collectionStats.put(collection, s);
+            return s;
+        } else {
+            return collectionStats.get(collection);
+        }
+    }
+
+    @Override
+    public <T extends Document> T find(Collection<T> collection, String key) {
+        getStats(collection).numFindCalls++;
+        if (printStacks) {
+            new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key).printStackTrace();
+        }
+        return delegate.find(collection, key);
+    }
+
+    @Override
+    public <T extends Document> T find(Collection<T> collection,
+                                       String key,
+                                       int maxCacheAge) {
+        getStats(collection).numFindCalls++;
+        if (printStacks) {
+            new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key + " [max: " + maxCacheAge + "]").printStackTrace();
+        }
+        return delegate.find(collection, key, maxCacheAge);
+    }
+
+    @Nonnull
+    @Override
+    public <T extends Document> List<T> query(Collection<T> collection,
+                                              String fromKey,
+                                              String toKey,
+                                              int limit) {
+        getStats(collection).numQueryCalls++;
+        if (printStacks) {
+            new Exception("query1 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace();
+        }
+        return delegate.query(collection, fromKey, toKey, limit);
+    }
+
+    @Nonnull
+    @Override
+    public <T extends Document> List<T> query(Collection<T> collection,
+                                              String fromKey,
+                                              String toKey,
+                                              String indexedProperty,
+                                              long startValue,
+                                              int limit) {
+        getStats(collection).numQueryCalls++;
+        if (printStacks) {
+            new Exception("query2 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace();
+        }
+        return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
+    }
+
+    @Override
+    public <T extends Document> void remove(Collection<T> collection,
+                                            String key) {
+        getStats(collection).numRemoveCalls++;
+        delegate.remove(collection, key);
+    }
+
+    @Override
+    public <T extends Document> void remove(Collection<T> collection,
+                                            List<String> keys) {
+        getStats(collection).numRemoveCalls++;
+        delegate.remove(collection, keys);
+    }
+
+    @Override
+    public <T extends Document> int remove(Collection<T> collection,
+                                           Map<String, Map<Key, Condition>> toRemove) {
+        getStats(collection).numRemoveCalls++;
+        return delegate.remove(collection, toRemove);
+    }
+
+    @Override
+    public <T extends Document> boolean create(Collection<T> collection,
+                                               List<UpdateOp> updateOps) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        return delegate.create(collection, updateOps);
+    }
+
+    @Override
+    public <T extends Document> void update(Collection<T> collection,
+                                            List<String> keys,
+                                            UpdateOp updateOp) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        delegate.update(collection, keys, updateOp);
+    }
+
+    @Override
+    public <T extends Document> T createOrUpdate(Collection<T> collection,
+                                                 UpdateOp update) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        return delegate.createOrUpdate(collection, update);
+    }
+
+    @Override
+    public <T extends Document> T findAndUpdate(Collection<T> collection,
+                                                UpdateOp update) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        return delegate.findAndUpdate(collection, update);
+    }
+
+    @Override
+    public CacheInvalidationStats invalidateCache() {
+        return delegate.invalidateCache();
+    }
+
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return delegate.invalidateCache(keys);
+    }
+
+    @Override
+    public <T extends Document> void invalidateCache(Collection<T> collection,
+                                                     String key) {
+        delegate.invalidateCache(collection, key);
+    }
+
+    @Override
+    public void dispose() {
+        delegate.dispose();
+    }
+
+    @Override
+    public <T extends Document> T getIfCached(Collection<T> collection,
+                                              String key) {
+        return delegate.getIfCached(collection, key);
+    }
+
+    @Override
+    public void setReadWriteMode(String readWriteMode) {
+        delegate.setReadWriteMode(readWriteMode);
+    }
+
+    @Override
+    public CacheStats getCacheStats() {
+        return delegate.getCacheStats();
+    }
+
+    @Override
+    public Map<String, String> getMetadata() {
+        return delegate.getMetadata();
+    }
+
+}

Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java (added)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+public class CountingTieredDiffCache extends TieredDiffCache {
+
+    class CountingLoader implements Loader {
+
+        private Loader delegate;
+
+        CountingLoader(Loader delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public String call() {
+            incLoadCount();
+            return delegate.call();
+        }
+
+    }
+
+    private int loadCount;
+
+    public CountingTieredDiffCache(DocumentMK.Builder builder) {
+        super(builder);
+    }
+
+    private void incLoadCount() {
+        loadCount++;
+    }
+
+    public int getLoadCount() {
+        return loadCount;
+    }
+
+    public void resetLoadCounter() {
+        loadCount = 0;
+    }
+
+    @Override
+    public String getChanges(@Nonnull Revision from,
+                             @Nonnull Revision to,
+                             @Nonnull String path,
+                             @Nullable Loader loader) {
+        return super.getChanges(from, to, path, new CountingLoader(loader));
+    }
+}

Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Tue Aug 11 12:55:41 2015
@@ -60,6 +60,7 @@ import com.google.common.base.Throwables
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -102,8 +103,8 @@ public class DocumentNodeStoreTest {
         DocumentStore docStore = new MemoryDocumentStore();
         DocumentStore testStore = new TimingDocumentStoreWrapper(docStore) {
             @Override
-            public CacheInvalidationStats invalidateCache() {
-                super.invalidateCache();
+            public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+                super.invalidateCache(keys);
                 semaphore.acquireUninterruptibly();
                 semaphore.release();
                 return null;
@@ -1667,7 +1668,7 @@ public class DocumentNodeStoreTest {
         merge(ns, builder);
         Revision to = ns.getHeadRevision();
 
-        DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to);
+        DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to, true);
         entry.append("/", "-\"foo\"");
         entry.done();
 

Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (added)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.json.JsopReader;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.apache.jackrabbit.oak.commons.sort.StringSort;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link JournalEntry}.
+ */
+public class JournalEntryTest {
+
+    @Test
+    public void applyTo() throws Exception {
+        DiffCache cache = new MemoryDiffCache(new DocumentMK.Builder());
+        List<String> paths = Lists.newArrayList();
+        addRandomPaths(paths);
+        StringSort sort = JournalEntry.newSorter();
+        add(sort, paths);
+        Revision from = new Revision(1, 0, 1);
+        Revision to = new Revision(2, 0, 1);
+        sort.sort();
+        JournalEntry.applyTo(sort, cache, from, to);
+
+        for (String p : paths) {
+            String changes = cache.getChanges(from, to, p, null);
+            assertNotNull("missing changes for " + p, changes);
+            for (String c : getChildren(changes)) {
+                assertTrue(paths.contains(PathUtils.concat(p, c)));
+            }
+        }
+        sort.close();
+    }
+
+    @Test
+    public void fillExternalChanges() throws Exception {
+        DocumentStore store = new MemoryDocumentStore();
+        JournalEntry entry = JOURNAL.newDocument(store);
+        Set<String> paths = Sets.newHashSet();
+        addRandomPaths(paths);
+        entry.modified(paths);
+        Revision r1 = new Revision(1, 0, 1);
+        Revision r2 = new Revision(2, 0, 1);
+        Revision r3 = new Revision(3, 0, 1);
+        UpdateOp op = entry.asUpdateOp(r2);
+        assertTrue(store.create(JOURNAL, Collections.singletonList(op)));
+
+        StringSort sort = JournalEntry.newSorter();
+        JournalEntry.fillExternalChanges(sort, r2, r3, store);
+        assertEquals(0, sort.getSize());
+
+        JournalEntry.fillExternalChanges(sort, r1, r2, store);
+        assertEquals(paths.size(), sort.getSize());
+        sort.close();
+
+        sort = JournalEntry.newSorter();
+        JournalEntry.fillExternalChanges(sort, r1, r3, store);
+        assertEquals(paths.size(), sort.getSize());
+        sort.close();
+    }
+
+    @Test
+    public void getRevisionTimestamp() throws Exception {
+        DocumentStore store = new MemoryDocumentStore();
+        JournalEntry entry = JOURNAL.newDocument(store);
+        entry.modified("/foo");
+        Revision r = Revision.newRevision(1);
+        assertTrue(store.create(JOURNAL,
+                Collections.singletonList(entry.asUpdateOp(r))));
+        entry = store.find(JOURNAL, JournalEntry.asId(r));
+        assertEquals(r.getTimestamp(), entry.getRevisionTimestamp());
+    }
+
+    private static void addRandomPaths(java.util.Collection<String> paths) throws IOException {
+        paths.add("/");
+        Random random = new Random(42);
+        for (int i = 0; i < 1000; i++) {
+            String path = "/";
+            int depth = random.nextInt(6);
+            for (int j = 0; j < depth; j++) {
+                char name = (char) ('a' + random.nextInt(26));
+                path = PathUtils.concat(path, String.valueOf(name));
+                paths.add(path);
+            }
+        }
+    }
+
+    private static void add(StringSort sort, List<String> paths)
+            throws IOException {
+        for (String p : paths) {
+            sort.add(p);
+        }
+    }
+
+    private static List<String> getChildren(String diff) {
+        List<String> children = Lists.newArrayList();
+        JsopTokenizer t = new JsopTokenizer(diff);
+        for (;;) {
+            int r = t.read();
+            switch (r) {
+                case '^': {
+                    children.add(t.readString());
+                    t.read(':');
+                    t.read('{');
+                    t.read('}');
+                    break;
+                }
+                case JsopReader.END: {
+                    return children;
+                }
+                default:
+                    fail("Unexpected token: " + r);
+            }
+        }
+    }
+}

Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (added)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.junit.Test;
+
+import static java.util.Collections.synchronizedList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class JournalTest extends AbstractJournalTest {
+
+    private MemoryDocumentStore ds;
+    private MemoryBlobStore bs;
+
+    class DiffingObserver implements Observer, Runnable, NodeStateDiff {
+
+        final List<DocumentNodeState> incomingRootStates1 = Lists.newArrayList();
+        final List<DocumentNodeState> diffedRootStates1 = Lists.newArrayList();
+        
+        DocumentNodeState oldRoot = null;
+        
+        DiffingObserver(boolean startInBackground) {
+            if (startInBackground) {
+                // start the diffing in the background - so as to not
+                // interfere with the contentChanged call
+                Thread th = new Thread(this);
+                th.setDaemon(true);
+                th.start();
+            }
+        }
+
+        public void clear() {
+            synchronized(incomingRootStates1) {
+                incomingRootStates1.clear();
+                diffedRootStates1.clear();
+            }
+        }
+        
+        @Override
+        public void contentChanged(NodeState root, CommitInfo info) {
+            synchronized(incomingRootStates1) {
+                incomingRootStates1.add((DocumentNodeState) root);
+                incomingRootStates1.notifyAll();
+            }
+        }
+        
+        public void processAll() {
+            while(processOne()) {
+                // continue
+            }
+        }
+
+        public boolean processOne() {
+            DocumentNodeState newRoot;
+            synchronized(incomingRootStates1) {
+                if (incomingRootStates1.size()==0) {
+                    return false;
+                }
+                newRoot = incomingRootStates1.remove(0);
+            }
+            if (oldRoot!=null) {
+                newRoot.compareAgainstBaseState(oldRoot, this);
+            }
+            oldRoot = newRoot;
+            synchronized(incomingRootStates1) {
+                diffedRootStates1.add(newRoot);
+            }
+            return true;
+        }
+        
+        @Override
+        public void run() {
+            while(true) {
+                DocumentNodeState newRoot;
+                synchronized(incomingRootStates1) {
+                    while(incomingRootStates1.size()==0) {
+                        try {
+                            incomingRootStates1.wait();
+                        } catch (InterruptedException e) {
+                            // ignore
+                        }
+                    }
+                    newRoot = incomingRootStates1.remove(0);
+                }
+                if (oldRoot!=null) {
+                    newRoot.compareAgainstBaseState(oldRoot, this);
+                }
+                oldRoot = newRoot;
+                synchronized(incomingRootStates1) {
+                    diffedRootStates1.add(newRoot);
+                }
+            }
+        }
+
+        @Override
+        public boolean propertyAdded(PropertyState after) {
+            return true;
+        }
+
+        @Override
+        public boolean propertyChanged(PropertyState before, PropertyState after) {
+            return true;
+        }
+
+        @Override
+        public boolean propertyDeleted(PropertyState before) {
+            return true;
+        }
+
+        @Override
+        public boolean childNodeAdded(String name, NodeState after) {
+            return true;
+        }
+
+        @Override
+        public boolean childNodeChanged(String name, NodeState before,
+                NodeState after) {
+            return true;
+        }
+
+        @Override
+        public boolean childNodeDeleted(String name, NodeState before) {
+            return true;
+        }
+
+        public int getTotal() {
+            synchronized(incomingRootStates1) {
+                return incomingRootStates1.size() + diffedRootStates1.size();
+            }
+        }
+        
+    }
+    
+    @Test
+    public void cleanupTest() throws Exception {
+        DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0);
+        DocumentNodeStore ns1 = mk1.getNodeStore();
+        // make sure we're visible and marked as active
+        ns1.renewClusterIdLease();
+        JournalGarbageCollector gc = new JournalGarbageCollector(ns1);
+        // first clean up
+        gc.gc(1, TimeUnit.MILLISECONDS);
+        Thread.sleep(100); // sleep just quickly
+        assertEquals(0, gc.gc(1, TimeUnit.DAYS));
+        assertEquals(0, gc.gc(6, TimeUnit.HOURS));
+        assertEquals(0, gc.gc(1, TimeUnit.HOURS));
+        assertEquals(0, gc.gc(10, TimeUnit.MINUTES));
+        assertEquals(0, gc.gc(1, TimeUnit.MINUTES));
+        assertEquals(0, gc.gc(1, TimeUnit.SECONDS));
+        assertEquals(0, gc.gc(1, TimeUnit.MILLISECONDS));
+        
+        // create some entries that can be deleted thereupon
+        mk1.commit("/", "+\"regular1\": {}", null, null);
+        mk1.commit("/", "+\"regular2\": {}", null, null);
+        mk1.commit("/", "+\"regular3\": {}", null, null);
+        mk1.commit("/regular2", "+\"regular4\": {}", null, null);
+        Thread.sleep(100); // sleep 100millis
+        assertEquals(0, gc.gc(5, TimeUnit.SECONDS));
+        assertEquals(0, gc.gc(1, TimeUnit.MILLISECONDS));
+        ns1.runBackgroundOperations();
+        mk1.commit("/", "+\"regular5\": {}", null, null);
+        ns1.runBackgroundOperations();
+        mk1.commit("/", "+\"regular6\": {}", null, null);
+        ns1.runBackgroundOperations();
+        Thread.sleep(100); // sleep 100millis
+        assertEquals(0, gc.gc(5, TimeUnit.SECONDS));
+        assertEquals(3, gc.gc(1, TimeUnit.MILLISECONDS));
+    }
+    
+    @Test
+    public void journalTest() throws Exception {
+        DocumentMK mk1 = createMK(1, 0);
+        DocumentNodeStore ns1 = mk1.getNodeStore();
+        CountingDocumentStore countingDocStore1 = builder.actualStore;
+        CountingTieredDiffCache countingDiffCache1 = builder.actualDiffCache;
+
+        DocumentMK mk2 = createMK(2, 0);
+        DocumentNodeStore ns2 = mk2.getNodeStore();
+        CountingDocumentStore countingDocStore2 = builder.actualStore;
+        CountingTieredDiffCache countingDiffCache2 = builder.actualDiffCache;
+
+        final DiffingObserver observer = new DiffingObserver(false);
+        ns1.addObserver(observer);
+        
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+        observer.processAll(); // to make sure we have an 'oldRoot'
+        observer.clear();
+        countingDocStore1.resetCounters();
+        countingDocStore2.resetCounters();
+        // countingDocStore1.printStacks = true;
+        countingDiffCache1.resetLoadCounter();
+        countingDiffCache2.resetLoadCounter();
+
+        mk2.commit("/", "+\"regular1\": {}", null, null);
+        mk2.commit("/", "+\"regular2\": {}", null, null);
+        mk2.commit("/", "+\"regular3\": {}", null, null);
+        mk2.commit("/regular2", "+\"regular4\": {}", null, null);
+        // flush to journal
+        ns2.runBackgroundOperations();
+        
+        // nothing notified yet
+        assertEquals(0, observer.getTotal());
+        assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES));
+        assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES));
+        assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES));
+        assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES));
+        assertEquals(0, countingDiffCache1.getLoadCount());
+        
+        // let node 1 read those changes
+        // System.err.println("run background ops");
+        ns1.runBackgroundOperations();
+        mk2.commit("/", "+\"regular5\": {}", null, null);
+        ns2.runBackgroundOperations();
+        ns1.runBackgroundOperations();
+        // and let the observer process everything
+        observer.processAll();
+        countingDocStore1.printStacks = false;
+        
+        // now expect 1 entry in rootStates
+        assertEquals(2, observer.getTotal());
+        assertEquals(0, countingDiffCache1.getLoadCount());
+        assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES));
+        assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES));
+        assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES));
+//        assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES));
+    }
+    
+    @Test
+    public void externalBranchChange() throws Exception {
+        DocumentMK mk1 = createMK(1, 0);
+        DocumentNodeStore ns1 = mk1.getNodeStore();
+        DocumentMK mk2 = createMK(2, 0);
+        DocumentNodeStore ns2 = mk2.getNodeStore();
+        
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+
+        mk1.commit("/", "+\"regular1\": {}", null, null);
+        // flush to journal
+        ns1.runBackgroundOperations();
+        mk1.commit("/regular1", "+\"regular1child\": {}", null, null);
+        // flush to journal
+        ns1.runBackgroundOperations();
+        mk1.commit("/", "+\"regular2\": {}", null, null);
+        // flush to journal
+        ns1.runBackgroundOperations();
+        mk1.commit("/", "+\"regular3\": {}", null, null);
+        // flush to journal
+        ns1.runBackgroundOperations();
+        mk1.commit("/", "+\"regular4\": {}", null, null);
+        // flush to journal
+        ns1.runBackgroundOperations();
+        mk1.commit("/", "+\"regular5\": {}", null, null);
+        // flush to journal
+        ns1.runBackgroundOperations();
+        String b1 = mk1.branch(null);
+        b1 = mk1.commit("/", "+\"branchVisible\": {}", b1, null);
+        mk1.merge(b1, null);
+        
+        // to flush the branch commit either dispose of mk1
+        // or run the background operations explicitly 
+        // (as that will propagate the lastRev to the root)
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+        
+        String nodes = mk2.getNodes("/", null, 0, 0, 100, null);
+        assertEquals("{\"branchVisible\":{},\"regular1\":{},\"regular2\":{},\"regular3\":{},\"regular4\":{},\"regular5\":{},\":childNodeCount\":6}", nodes);
+    }
+    
+    /** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/
+    @Test
+    public void lastRevRecoveryJournalTest() throws Exception {
+        doLastRevRecoveryJournalTest(false);
+    }
+    
+    /** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/
+    @Test
+    public void lastRevRecoveryJournalTestWithConcurrency() throws Exception {
+        doLastRevRecoveryJournalTest(true);
+    }
+    
+    private void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception {
+        DocumentMK mk1 = createMK(0 /*clusterId via clusterNodes collection*/, 0);
+        DocumentNodeStore ds1 = mk1.getNodeStore();
+        int c1Id = ds1.getClusterId();
+        DocumentMK mk2 = createMK(0 /*clusterId via clusterNodes collection*/, 0);
+        DocumentNodeStore ds2 = mk2.getNodeStore();
+        final int c2Id = ds2.getClusterId();
+        
+        // should have 1 each with just the root changed
+        assertJournalEntries(ds1, "{}");
+        assertJournalEntries(ds2, "{}");
+        assertEquals(1, countJournalEntries(ds1, 10)); 
+        assertEquals(1, countJournalEntries(ds2, 10));
+        
+        //1. Create base structure /x/y
+        NodeBuilder b1 = ds1.getRoot().builder();
+        b1.child("x").child("y");
+        ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        ds1.runBackgroundOperations();
+
+        //lastRev are persisted directly for new nodes. In case of
+        // updates they are persisted via background jobs
+
+        //1.2 Get last rev populated for root node for ds2
+        ds2.runBackgroundOperations();
+        NodeBuilder b2 = ds2.getRoot().builder();
+        b2.child("x").setProperty("f1","b1");
+        ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        ds2.runBackgroundOperations();
+
+        //2. Add a new node /x/y/z
+        b2 = ds2.getRoot().builder();
+        b2.child("x").child("y").child("z").setProperty("foo", "bar");
+        ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        //Refresh DS1
+        ds1.runBackgroundOperations();
+
+        final NodeDocument z1 = getDocument(ds1, "/x/y/z");
+        NodeDocument y1 = getDocument(ds1, "/x/y");
+        final NodeDocument x1 = getDocument(ds1, "/x");
+
+        Revision head2 = ds2.getHeadRevision();
+
+        //lastRev should not be updated for C #2
+        assertNull(y1.getLastRev().get(c2Id));
+
+        final LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1);
+
+        // besides the former root change, now 1 also has 
+        final String change1 = "{\"x\":{\"y\":{}}}";
+        assertJournalEntries(ds1, "{}", change1);
+        final String change2 = "{\"x\":{}}";
+        assertJournalEntries(ds2, "{}", change2);
+
+
+        String change2b = "{\"x\":{\"y\":{\"z\":{}}}}";
+
+        if (!testConcurrency) {
+            //Do not pass y1 but still y1 should be updated
+            recovery.recover(Iterators.forArray(x1,z1), c2Id);
+    
+            //Post recovery the lastRev should be updated for /x/y and /x
+            assertEquals(head2, getDocument(ds1, "/x/y").getLastRev().get(c2Id));
+            assertEquals(head2, getDocument(ds1, "/x").getLastRev().get(c2Id));
+            assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id));
+    
+            // now 1 is unchanged, but 2 was recovered now, so has one more:
+            assertJournalEntries(ds1, "{}", change1); // unchanged
+            assertJournalEntries(ds2, "{}", change2, change2b);
+            
+            // just some no-ops:
+            recovery.recover(c2Id);
+            recovery.recover(Iterators.<NodeDocument>emptyIterator(), c2Id);
+            assertJournalEntries(ds1, "{}", change1); // unchanged
+            assertJournalEntries(ds2, "{}", change2, change2b);
+
+        } else {
+        
+            // do some concurrency testing as well to check if 
+            final int NUM_THREADS = 200;
+            final CountDownLatch ready = new CountDownLatch(NUM_THREADS);
+            final CountDownLatch start = new CountDownLatch(1);
+            final CountDownLatch end = new CountDownLatch(NUM_THREADS);
+            final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>());
+            for (int i = 0; i < NUM_THREADS; i++) {
+                Thread th = new Thread(new Runnable() {
+    
+                    @Override
+                    public void run() {
+                        try {
+                            ready.countDown();
+                            start.await();
+                            recovery.recover(Iterators.forArray(x1,z1), c2Id);
+                        } catch (Exception e) {
+                            exceptions.add(e);
+                        } finally {
+                            end.countDown();
+                        }
+                    }
+                    
+                });
+                th.start();
+            }
+            ready.await(5, TimeUnit.SECONDS);
+            start.countDown();
+            assertTrue(end.await(20, TimeUnit.SECONDS));
+            assertJournalEntries(ds1, "{}", change1); // unchanged
+            assertJournalEntries(ds2, "{}", change2, change2b);
+            for (Exception ex : exceptions) {
+                throw ex;
+            }
+        }
+    }
+
+    private DocumentMK createMK(int clusterId, int asyncDelay) {
+        if (ds == null) {
+            ds = new MemoryDocumentStore();
+        }
+        if (bs == null) {
+            bs = new MemoryBlobStore();
+        }
+        return createMK(clusterId, asyncDelay, ds, bs);
+    }
+}

Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
------------------------------------------------------------------------------
    svn:eol-style = native