You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/07/01 15:37:36 UTC

svn commit: r1688649 [2/2] - in /jackrabbit/oak/branches/1.2: ./ oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugi...

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Wed Jul  1 13:37:35 2015
@@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Document;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.JournalEntry;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
@@ -73,6 +74,12 @@ public class MemoryDocumentStore impleme
     private ConcurrentSkipListMap<String, Document> settings =
             new ConcurrentSkipListMap<String, Document>();
 
+    /**
+     * The 'externalChanges' collection.
+     */
+    private ConcurrentSkipListMap<String, JournalEntry> externalChanges =
+            new ConcurrentSkipListMap<String, JournalEntry>();
+
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     /**
@@ -226,8 +233,10 @@ public class MemoryDocumentStore impleme
             return (ConcurrentSkipListMap<String, T>) nodes;
         } else if (collection == Collection.CLUSTER_NODES) {
             return (ConcurrentSkipListMap<String, T>) clusterNodes;
-        }else if (collection == Collection.SETTINGS) {
+        } else if (collection == Collection.SETTINGS) {
             return (ConcurrentSkipListMap<String, T>) settings;
+        } else if (collection == Collection.JOURNAL) {
+            return (ConcurrentSkipListMap<String, T>) externalChanges;
         } else {
             throw new IllegalArgumentException(
                     "Unknown collection: " + collection.toString());
@@ -329,6 +338,11 @@ public class MemoryDocumentStore impleme
     }
 
     @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return null;
+    }
+    
+    @Override
     public void dispose() {
         // ignore
     }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java Wed Jul  1 13:37:35 2015
@@ -195,7 +195,7 @@ abstract class CacheInvalidator {
             PeekingIterator<TreeNode> pitr = Iterators.peekingIterator(treeItr);
             Map<String, TreeNode> sameLevelNodes = Maps.newHashMap();
 
-            // Fetch only the lastRev map and id
+            // Fetch only the modCount and id
             final BasicDBObject keys = new BasicDBObject(Document.ID, 1);
             keys.put(Document.MOD_COUNT, 1);
 
@@ -228,7 +228,7 @@ abstract class CacheInvalidator {
                         QueryBuilder query = QueryBuilder.start(Document.ID)
                                 .in(idBatch);
 
-                        // Fetch lastRev and modCount for each such nodes
+                        // Fetch modCount for each such nodes
                         DBCursor cursor = nodes.find(query.get(), keys);
                         cursor.setReadPreference(ReadPreference.primary());
                         LOG.debug(

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java Wed Jul  1 13:37:35 2015
@@ -91,7 +91,7 @@ public class MongoDiffCache extends Memo
             if (changes == null && loader != null) {
                 changes = loader.call();
                 // put into memory cache
-                super.newEntry(from, to).append(path, changes);
+                super.newEntry(from, to, false).append(path, changes);
             }
             return changes;
         } finally {
@@ -102,7 +102,8 @@ public class MongoDiffCache extends Memo
     @Nonnull
     @Override
     public Entry newEntry(@Nonnull final Revision from,
-                          @Nonnull final Revision to) {
+                          @Nonnull final Revision to,
+                          boolean local /*ignored*/) {
         return new MemoryEntry(from, to) {
 
             private Diff commit = new Diff(from, to);
@@ -172,7 +173,7 @@ public class MongoDiffCache extends Memo
                 // diff is complete
                 LOG.debug("Built diff from {} commits", numCommits);
                 // apply to diff cache and serve later requests from cache
-                d.applyToEntry(super.newEntry(from, to)).done();
+                d.applyToEntry(super.newEntry(from, to, false)).done();
                 // return changes
                 return d.getChanges(path);
             }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Wed Jul  1 13:37:35 2015
@@ -67,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.cache.ForwardingListener;
 import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache;
 import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache;
+import org.apache.jackrabbit.oak.plugins.document.mongo.CacheInvalidator.InvalidationResult;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.stats.Clock;
@@ -117,6 +118,7 @@ public class MongoDocumentStore implemen
     private final DBCollection nodes;
     private final DBCollection clusterNodes;
     private final DBCollection settings;
+    private final DBCollection journal;
 
     private final Cache<CacheValue, NodeDocument> nodesCache;
     private final CacheStats cacheStats;
@@ -196,12 +198,10 @@ public class MongoDocumentStore implemen
                 .put("version", version)
                 .build();
 
-        nodes = db.getCollection(
-                Collection.NODES.toString());
-        clusterNodes = db.getCollection(
-                Collection.CLUSTER_NODES.toString());
-        settings = db.getCollection(
-                Collection.SETTINGS.toString());
+        nodes = db.getCollection(Collection.NODES.toString());
+        clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString());
+        settings = db.getCollection(Collection.SETTINGS.toString());
+        journal = db.getCollection(Collection.JOURNAL.toString());
 
         maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
 
@@ -299,6 +299,59 @@ public class MongoDocumentStore implemen
         //that would lead to lesser number of queries
         return CacheInvalidator.createHierarchicalInvalidator(this).invalidateCache();
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        LOG.debug("invalidateCache: start");
+        final InvalidationResult result = new InvalidationResult();
+        int size  = 0;
+
+        final Iterator<String> it = keys.iterator();
+        while(it.hasNext()) {
+            // read chunks of documents only
+            final List<String> ids = new ArrayList<String>(IN_CLAUSE_BATCH_SIZE);
+            while(it.hasNext() && ids.size() < IN_CLAUSE_BATCH_SIZE) {
+                final String id = it.next();
+                if (getCachedNodeDoc(id) != null) {
+                    // only add those that we actually do have cached
+                    ids.add(id);
+                }
+            }
+            size += ids.size();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("invalidateCache: batch size: {} of total so far {}",
+                        ids.size(), size);
+            }
+            
+            QueryBuilder query = QueryBuilder.start(Document.ID).in(ids);
+            // Fetch only the modCount and id
+            final BasicDBObject fields = new BasicDBObject(Document.ID, 1);
+            fields.put(Document.MOD_COUNT, 1);
+            
+            DBCursor cursor = nodes.find(query.get(), fields);
+            cursor.setReadPreference(ReadPreference.primary());
+            result.queryCount++;
+            
+            for (DBObject obj : cursor) {
+                result.cacheEntriesProcessedCount++;
+                String id = (String) obj.get(Document.ID);
+                Number modCount = (Number) obj.get(Document.MOD_COUNT);
+                
+                CachedNodeDocument cachedDoc = getCachedNodeDoc(id);
+                if (cachedDoc != null
+                        && !Objects.equal(cachedDoc.getModCount(), modCount)) {
+                    invalidateCache(Collection.NODES, id);
+                    result.invalidationCount++;
+                } else {
+                    result.upToDateCount++;
+                }
+            }
+        }
+
+        result.cacheSize = size;
+        LOG.trace("invalidateCache: end. total: {}", size);
+        return result;
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {
@@ -365,31 +418,30 @@ public class MongoDocumentStore implemen
         try {
             TreeLock lock = acquire(key);
             try {
-                if (maxCacheAge == 0) {
-                    invalidateCache(collection, key);
-                }
-                while (true) {
-                    doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() {
-                        @Override
-                        public NodeDocument call() throws Exception {
-                            NodeDocument doc = (NodeDocument) findUncachedWithRetry(
-                                    collection, key,
-                                    getReadPreference(maxCacheAge), 2);
-                            if (doc == null) {
-                                doc = NodeDocument.NULL;
+                if (maxCacheAge > 0 || preferCached) {
+                    // try again some other thread may have populated
+                    // the cache by now
+                    doc = nodesCache.getIfPresent(cacheKey);
+                    if (doc != null) {
+                        if (preferCached ||
+                                getTime() - doc.getCreated() < maxCacheAge) {
+                            if (doc == NodeDocument.NULL) {
+                                return null;
                             }
-                            return doc;
+                            return (T) doc;
                         }
-                    });
-                    if (maxCacheAge == 0 || preferCached) {
-                        break;
-                    }
-                    if (getTime() - doc.getCreated() < maxCacheAge) {
-                        break;
                     }
-                    // too old: invalidate, try again
-                    invalidateCache(collection, key);
                 }
+                final NodeDocument d = (NodeDocument) findUncachedWithRetry(
+                        collection, key,
+                        getReadPreference(maxCacheAge), 2);
+                invalidateCache(collection, key);
+                doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() {
+                    @Override
+                    public NodeDocument call() throws Exception {
+                        return d == null ? NodeDocument.NULL : d;
+                    }
+                });
             } finally {
                 lock.unlock();
             }
@@ -402,6 +454,8 @@ public class MongoDocumentStore implemen
             t = e.getCause();
         } catch (ExecutionException e) {
             t = e.getCause();
+        } catch (RuntimeException e) {
+            t = e;
         }
         throw new DocumentStoreException("Failed to load document with " + key, t);
     }
@@ -423,6 +477,9 @@ public class MongoDocumentStore implemen
             DocumentReadPreference docReadPref,
             int retries) {
         checkArgument(retries >= 0, "retries must not be negative");
+        if (key.equals("0:/")) {
+            LOG.trace("root node");
+        }
         int numAttempts = retries + 1;
         MongoException ex = null;
         for (int i = 0; i < numAttempts; i++) {
@@ -560,9 +617,13 @@ public class MongoDocumentStore implemen
         }
         DBObject query = queryBuilder.get();
         String parentId = Utils.getParentIdFromLowerLimit(fromKey);
+        long lockTime = -1;
         final long start = PERFLOG.start();
-        TreeLock lock = withLock ? acquireExclusive(parentId != null ? parentId : "") : null;
+        TreeLock lock = acquireExclusive(parentId != null ? parentId : "");
         try {
+            if (start != -1) {
+                lockTime = System.currentTimeMillis() - start;
+            }
             DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC);
             if (!disableIndexHint) {
                 cursor.hint(hint);
@@ -620,7 +681,7 @@ public class MongoDocumentStore implemen
             if (lock != null) {
                 lock.unlock();
             }
-            PERFLOG.end(start, 1, "query for children from [{}] to [{}]", fromKey, toKey);
+            PERFLOG.end(start, 1, "query for children from [{}] to [{}], lock:{}", fromKey, toKey, lockTime);
         }
     }
 
@@ -1014,7 +1075,9 @@ public class MongoDocumentStore implemen
             return clusterNodes;
         } else if (collection == Collection.SETTINGS) {
             return settings;
-        }else {
+        } else if (collection == Collection.JOURNAL) {
+            return journal;
+        } else {
             throw new IllegalArgumentException(
                     "Unknown collection: " + collection.toString());
         }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Wed Jul  1 13:37:35 2015
@@ -282,6 +282,12 @@ public class RDBDocumentStore implements
         }
         return null;
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        //TODO: optimize me
+        return invalidateCache();
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String id) {
@@ -783,7 +789,7 @@ public class RDBDocumentStore implements
     private Set<String> tablesToBeDropped = new HashSet<String>();
 
     // table names
-    private String tnNodes, tnClusterNodes, tnSettings; 
+    private String tnNodes, tnClusterNodes, tnSettings, tnJournal;
 
     // ratio between Java characters and UTF-8 encoding
     // a) single characters will fit into 3 bytes
@@ -825,6 +831,7 @@ public class RDBDocumentStore implements
         this.tnNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.NODES));
         this.tnClusterNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.CLUSTER_NODES));
         this.tnSettings = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.SETTINGS));
+        this.tnJournal = RDBJDBCTools.createTableName(options.getTablePrefix(), "JOURNAL");
 
         this.ch = new RDBConnectionHandler(ds);
         this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBDocumentStore creation") : null;
@@ -878,6 +885,7 @@ public class RDBDocumentStore implements
             createTableFor(con, Collection.CLUSTER_NODES, tablesCreated, tablesPresent, tableDiags);
             createTableFor(con, Collection.NODES, tablesCreated, tablesPresent, tableDiags);
             createTableFor(con, Collection.SETTINGS, tablesCreated, tablesPresent, tableDiags);
+            createTableFor(con, Collection.JOURNAL, tablesCreated, tablesPresent, tableDiags);
         } finally {
             con.commit();
             con.close();
@@ -1316,6 +1324,8 @@ public class RDBDocumentStore implements
             return this.tnNodes;
         } else if (collection == Collection.SETTINGS) {
             return this.tnSettings;
+        } else if (collection == Collection.JOURNAL) {
+            return this.tnJournal;
         } else {
             throw new IllegalArgumentException("Unknown collection: " + collection.toString());
         }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Wed Jul  1 13:37:35 2015
@@ -251,6 +251,17 @@ public class LoggingDocumentStoreWrapper
             throw convert(e);
         }
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        try {
+            logMethod("invalidateCache", keys);
+            return store.invalidateCache(keys);
+        } catch (Exception e) {
+            logException(e);
+            throw convert(e);
+        }
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Wed Jul  1 13:37:35 2015
@@ -107,6 +107,11 @@ public class SynchronizingDocumentStoreW
     }
 
     @Override
+    public synchronized CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return store.invalidateCache(keys);
+    }
+    
+    @Override
     public synchronized <T extends Document> void invalidateCache(Collection<T> collection, String key) {
         store.invalidateCache(collection, key);
     }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Wed Jul  1 13:37:35 2015
@@ -282,6 +282,18 @@ public class TimingDocumentStoreWrapper
             throw convert(e);
         }
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        try {
+            long start = now();
+            CacheInvalidationStats result = base.invalidateCache(keys);
+            updateAndLogTimes("invalidateCache3", start, 0, 0);
+            return result;
+        } catch (Exception e) {
+            throw convert(e);
+        }
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Wed Jul  1 13:37:35 2015
@@ -34,6 +34,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.AbstractIterator;
 import com.mongodb.BasicDBObject;
@@ -51,6 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
 
 /**
@@ -589,4 +591,31 @@ public class Utils {
     public static boolean isHiddenPath(@Nonnull String path) {
         return path.contains("/:");
     }
+
+    /**
+     * Transforms the given {@link Iterable} from {@link String} to
+     * {@link StringValue} elements. The {@link Iterable} must no have
+     * {@code null} values.
+     */
+    public static Iterable<StringValue> asStringValueIterable(
+            @Nonnull Iterable<String> values) {
+        return transform(values, new Function<String, StringValue>() {
+            @Override
+            public StringValue apply(String input) {
+                return new StringValue(input);
+            }
+        });
+    }
+
+    /**
+     * Transforms the given paths into ids using {@link #getIdFromPath(String)}.
+     */
+    public static Iterable<String> pathToId(@Nonnull Iterable<String> paths) {
+        return transform(paths, new Function<String, String>() {
+            @Override
+            public String apply(String input) {
+                return getIdFromPath(input);
+            }
+        });
+    }
 }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java Wed Jul  1 13:37:35 2015
@@ -161,7 +161,7 @@ public abstract class NodeObserver imple
                 while (!generator.isDone()) {
                     generator.generate();
                 }
-                PERF_LOGGER.end(start, 10,
+                PERF_LOGGER.end(start, 100,
                         "Generated events (before: {}, after: {})",
                         previousRoot, root);
             } catch (Exception e) {

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java Wed Jul  1 13:37:35 2015
@@ -47,7 +47,7 @@ class AmnesiaDiffCache implements DiffCa
 
     @Nonnull
     @Override
-    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) {
+    public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) {
         return new Entry() {
             @Override
             public void append(@Nonnull String path, @Nonnull String changes) {

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java Wed Jul  1 13:37:35 2015
@@ -370,6 +370,10 @@ public class ClusterTest {
                 rootStates2.add((DocumentNodeState) root);
             }
         });
+
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+
         rootStates1.clear();
         rootStates2.clear();
 

Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java&r1=1684820&r2=1688649&rev=1688649&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Wed Jul  1 13:37:35 2015
@@ -20,186 +20,203 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nonnull;
+
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 
 public class CountingDocumentStore implements DocumentStore {
-	
-	private DocumentStore delegate;
-	
-	//TODO: remove mec
-	boolean printStacks;
-	
-	class Stats {
-		
-		private int numFindCalls;
-		private int numQueryCalls;
-		private int numRemoveCalls;
-		private int numCreateOrUpdateCalls;
-		
-	}
-	
-	private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>();
-
-	public CountingDocumentStore(DocumentStore delegate) {
-		this.delegate = delegate;
-	}
-	
-	public void resetCounters() {
-		collectionStats.clear();
-	}
-
-	public int getNumFindCalls(Collection collection) {
-		return getStats(collection).numFindCalls;
-	}
-	
-	public int getNumQueryCalls(Collection collection) {
-		return getStats(collection).numQueryCalls;
-	}
-	
-	public int getNumRemoveCalls(Collection collection) {
-		return getStats(collection).numRemoveCalls;
-	}
-	
-	public int getNumCreateOrUpdateCalls(Collection collection) {
-		return getStats(collection).numCreateOrUpdateCalls;
-	}
-	
-	private Stats getStats(Collection collection) {
-		if (!collectionStats.containsKey(collection)) {
-			Stats s = new Stats();
-			collectionStats.put(collection, s);
-			return s;
-		} else {
-			return collectionStats.get(collection);
-		}
-	}
-	
-	@Override
-	public <T extends Document> T find(Collection<T> collection, String key) {
-		getStats(collection).numFindCalls++;
-		if (printStacks) {
-			new Exception("find ["+getStats(collection).numFindCalls+"] ("+collection+") "+key).printStackTrace();
-		}
-		return delegate.find(collection, key);
-	}
-
-	@Override
-	public <T extends Document> T find(Collection<T> collection, String key,
-			int maxCacheAge) {
-		getStats(collection).numFindCalls++;
-		if (printStacks) {
-			new Exception("find ["+getStats(collection).numFindCalls+"] ("+collection+") "+key+" [max: "+maxCacheAge+"]").printStackTrace();
-		}
-		return delegate.find(collection, key, maxCacheAge);
-	}
-
-	@Override
-	public <T extends Document> List<T> query(Collection<T> collection,
-			String fromKey, String toKey, int limit) {
-		getStats(collection).numQueryCalls++;
-		if (printStacks) {
-			new Exception("query1 ["+getStats(collection).numQueryCalls+"] ("+collection+") "+fromKey+", to "+toKey+". limit "+limit).printStackTrace();
-		}
-		return delegate.query(collection, fromKey, toKey, limit);
-	}
-
-	@Override
-	public <T extends Document> List<T> query(Collection<T> collection,
-			String fromKey, String toKey, String indexedProperty,
-			long startValue, int limit) {
-		getStats(collection).numQueryCalls++;
-		if (printStacks) {
-			new Exception("query2 ["+getStats(collection).numQueryCalls+"] ("+collection+") "+fromKey+", to "+toKey+". limit "+limit).printStackTrace();
-		}
-		return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
-	}
-
-	@Override
-	public <T extends Document> void remove(Collection<T> collection, String key) {
-		getStats(collection).numRemoveCalls++;
-		delegate.remove(collection, key);
-	}
-
-	@Override
-	public <T extends Document> void remove(Collection<T> collection,
-			List<String> keys) {
-		getStats(collection).numRemoveCalls++;
-		delegate.remove(collection, keys);
-	}
-
-	@Override
-	public <T extends Document> int remove(Collection<T> collection,
-			Map<String, Map<Key, Condition>> toRemove) {
-		getStats(collection).numRemoveCalls++;
-		return delegate.remove(collection, toRemove);
-	}
-
-	@Override
-	public <T extends Document> boolean create(Collection<T> collection,
-			List<UpdateOp> updateOps) {
-		getStats(collection).numCreateOrUpdateCalls++;
-		return delegate.create(collection, updateOps);
-	}
-
-	@Override
-	public <T extends Document> void update(Collection<T> collection,
-			List<String> keys, UpdateOp updateOp) {
-		getStats(collection).numCreateOrUpdateCalls++;
-		delegate.update(collection, keys, updateOp);
-	}
-
-	@Override
-	public <T extends Document> T createOrUpdate(Collection<T> collection,
-			UpdateOp update) {
-		getStats(collection).numCreateOrUpdateCalls++;
-		return delegate.createOrUpdate(collection, update);
-	}
-
-	@Override
-	public <T extends Document> T findAndUpdate(Collection<T> collection,
-			UpdateOp update) {
-		getStats(collection).numCreateOrUpdateCalls++;
-		return delegate.findAndUpdate(collection, update);
-	}
-
-	@Override
-	public CacheInvalidationStats invalidateCache() {
-		return delegate.invalidateCache();
-	}
-
-	@Override
-	public <T extends Document> void invalidateCache(Collection<T> collection,
-			String key) {
-		delegate.invalidateCache(collection, key);
-	}
-
-	@Override
-	public void dispose() {
-		delegate.dispose();
-	}
-
-	@Override
-	public <T extends Document> T getIfCached(Collection<T> collection,
-			String key) {
-		return delegate.getIfCached(collection, key);
-	}
-
-	@Override
-	public void setReadWriteMode(String readWriteMode) {
-		delegate.setReadWriteMode(readWriteMode);
-	}
-
-	@Override
-	public CacheStats getCacheStats() {
-		return delegate.getCacheStats();
-	}
-
-	@Override
-	public Map<String, String> getMetadata() {
-		return delegate.getMetadata();
-	}
+
+    private DocumentStore delegate;
+
+    //TODO: remove mec
+    boolean printStacks;
+
+    class Stats {
+
+        private int numFindCalls;
+        private int numQueryCalls;
+        private int numRemoveCalls;
+        private int numCreateOrUpdateCalls;
+
+    }
+
+    private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>();
+
+    public CountingDocumentStore(DocumentStore delegate) {
+        this.delegate = delegate;
+    }
+
+    public void resetCounters() {
+        collectionStats.clear();
+    }
+
+    public int getNumFindCalls(Collection collection) {
+        return getStats(collection).numFindCalls;
+    }
+
+    public int getNumQueryCalls(Collection collection) {
+        return getStats(collection).numQueryCalls;
+    }
+
+    public int getNumRemoveCalls(Collection collection) {
+        return getStats(collection).numRemoveCalls;
+    }
+
+    public int getNumCreateOrUpdateCalls(Collection collection) {
+        return getStats(collection).numCreateOrUpdateCalls;
+    }
+
+    private Stats getStats(Collection collection) {
+        if (!collectionStats.containsKey(collection)) {
+            Stats s = new Stats();
+            collectionStats.put(collection, s);
+            return s;
+        } else {
+            return collectionStats.get(collection);
+        }
+    }
+
+    @Override
+    public <T extends Document> T find(Collection<T> collection, String key) {
+        getStats(collection).numFindCalls++;
+        if (printStacks) {
+            new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key).printStackTrace();
+        }
+        return delegate.find(collection, key);
+    }
+
+    @Override
+    public <T extends Document> T find(Collection<T> collection,
+                                       String key,
+                                       int maxCacheAge) {
+        getStats(collection).numFindCalls++;
+        if (printStacks) {
+            new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key + " [max: " + maxCacheAge + "]").printStackTrace();
+        }
+        return delegate.find(collection, key, maxCacheAge);
+    }
+
+    @Nonnull
+    @Override
+    public <T extends Document> List<T> query(Collection<T> collection,
+                                              String fromKey,
+                                              String toKey,
+                                              int limit) {
+        getStats(collection).numQueryCalls++;
+        if (printStacks) {
+            new Exception("query1 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace();
+        }
+        return delegate.query(collection, fromKey, toKey, limit);
+    }
+
+    @Nonnull
+    @Override
+    public <T extends Document> List<T> query(Collection<T> collection,
+                                              String fromKey,
+                                              String toKey,
+                                              String indexedProperty,
+                                              long startValue,
+                                              int limit) {
+        getStats(collection).numQueryCalls++;
+        if (printStacks) {
+            new Exception("query2 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace();
+        }
+        return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
+    }
+
+    @Override
+    public <T extends Document> void remove(Collection<T> collection,
+                                            String key) {
+        getStats(collection).numRemoveCalls++;
+        delegate.remove(collection, key);
+    }
+
+    @Override
+    public <T extends Document> void remove(Collection<T> collection,
+                                            List<String> keys) {
+        getStats(collection).numRemoveCalls++;
+        delegate.remove(collection, keys);
+    }
+
+    @Override
+    public <T extends Document> int remove(Collection<T> collection,
+                                           Map<String, Map<Key, Condition>> toRemove) {
+        getStats(collection).numRemoveCalls++;
+        return delegate.remove(collection, toRemove);
+    }
+
+    @Override
+    public <T extends Document> boolean create(Collection<T> collection,
+                                               List<UpdateOp> updateOps) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        return delegate.create(collection, updateOps);
+    }
+
+    @Override
+    public <T extends Document> void update(Collection<T> collection,
+                                            List<String> keys,
+                                            UpdateOp updateOp) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        delegate.update(collection, keys, updateOp);
+    }
+
+    @Override
+    public <T extends Document> T createOrUpdate(Collection<T> collection,
+                                                 UpdateOp update) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        return delegate.createOrUpdate(collection, update);
+    }
+
+    @Override
+    public <T extends Document> T findAndUpdate(Collection<T> collection,
+                                                UpdateOp update) {
+        getStats(collection).numCreateOrUpdateCalls++;
+        return delegate.findAndUpdate(collection, update);
+    }
+
+    @Override
+    public CacheInvalidationStats invalidateCache() {
+        return delegate.invalidateCache();
+    }
+
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return delegate.invalidateCache(keys);
+    }
+
+    @Override
+    public <T extends Document> void invalidateCache(Collection<T> collection,
+                                                     String key) {
+        delegate.invalidateCache(collection, key);
+    }
+
+    @Override
+    public void dispose() {
+        delegate.dispose();
+    }
+
+    @Override
+    public <T extends Document> T getIfCached(Collection<T> collection,
+                                              String key) {
+        return delegate.getIfCached(collection, key);
+    }
+
+    @Override
+    public void setReadWriteMode(String readWriteMode) {
+        delegate.setReadWriteMode(readWriteMode);
+    }
+
+    @Override
+    public CacheStats getCacheStats() {
+        return delegate.getCacheStats();
+    }
+
+    @Override
+    public Map<String, String> getMetadata() {
+        return delegate.getMetadata();
+    }
 
 }

Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java&r1=1684820&r2=1688649&rev=1688649&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java Wed Jul  1 13:37:35 2015
@@ -16,45 +16,50 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 public class CountingTieredDiffCache extends TieredDiffCache {
 
-	class CountingLoader implements Loader {
+    class CountingLoader implements Loader {
+
+        private Loader delegate;
+
+        CountingLoader(Loader delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public String call() {
+            incLoadCount();
+            return delegate.call();
+        }
+
+    }
+
+    private int loadCount;
+
+    public CountingTieredDiffCache(DocumentMK.Builder builder) {
+        super(builder);
+    }
+
+    private void incLoadCount() {
+        loadCount++;
+    }
 
-		private Loader delegate;
+    public int getLoadCount() {
+        return loadCount;
+    }
 
-		CountingLoader(Loader delegate) {
-			this.delegate = delegate;
-		}
-		
-		@Override
-		public String call() {
-			incLoadCount();
-			return delegate.call();
-		}
-		
-	}
-
-	private int loadCount;
-	
-	public CountingTieredDiffCache(DocumentMK.Builder builder) {
-    	super(builder);
-    }
-	
-	private void incLoadCount() {
-		loadCount++;		
-	}
-	
-	public int getLoadCount() {
-		return loadCount;
-	}
-	
-	public void resetLoadCounter() {
-		loadCount = 0;
-	}
-
-	@Override
-	public String getChanges(Revision from, Revision to, String path,
-			Loader loader) {
-		return super.getChanges(from, to, path, new CountingLoader(loader));
-	}
+    public void resetLoadCounter() {
+        loadCount = 0;
+    }
+
+    @Override
+    public String getChanges(@Nonnull Revision from,
+                             @Nonnull Revision to,
+                             @Nonnull String path,
+                             @Nullable Loader loader) {
+        return super.getChanges(from, to, path, new CountingLoader(loader));
+    }
 }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Wed Jul  1 13:37:35 2015
@@ -60,6 +60,7 @@ import com.google.common.base.Throwables
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -103,8 +104,8 @@ public class DocumentNodeStoreTest {
         DocumentStore docStore = new MemoryDocumentStore();
         DocumentStore testStore = new TimingDocumentStoreWrapper(docStore) {
             @Override
-            public CacheInvalidationStats invalidateCache() {
-                super.invalidateCache();
+            public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+                super.invalidateCache(keys);
                 semaphore.acquireUninterruptibly();
                 semaphore.release();
                 return null;
@@ -1662,7 +1663,7 @@ public class DocumentNodeStoreTest {
         merge(ns, builder);
         Revision to = ns.getHeadRevision();
 
-        DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to);
+        DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to, true);
         entry.append("/", "-\"foo\"");
         entry.done();
 

Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (from r1685977, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java&r1=1685977&r2=1688649&rev=1688649&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Wed Jul  1 13:37:35 2015
@@ -52,11 +52,12 @@ public class JournalEntryTest {
         add(sort, paths);
         Revision from = new Revision(1, 0, 1);
         Revision to = new Revision(2, 0, 1);
+        sort.sort();
         JournalEntry.applyTo(sort, cache, from, to);
 
         for (String p : paths) {
             String changes = cache.getChanges(from, to, p, null);
-            assertNotNull(changes);
+            assertNotNull("missing changes for " + p, changes);
             for (String c : getChildren(changes)) {
                 assertTrue(paths.contains(PathUtils.concat(p, c)));
             }

Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java&r1=1684820&r2=1688649&rev=1688649&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Wed Jul  1 13:37:35 2015
@@ -16,23 +16,16 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
-import org.apache.jackrabbit.oak.plugins.document.util.Utils;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -40,26 +33,18 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.mongodb.DB;
-
-public class JournalTest {
+import static java.util.Collections.synchronizedList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-    private static final boolean MONGO_DB = false;
-//    private static final boolean MONGO_DB = true;
-    
-    private TestBuilder builder;
+public class JournalTest extends AbstractJournalTest {
 
     private MemoryDocumentStore ds;
     private MemoryBlobStore bs;
 
-    private List<DocumentMK> mks = Lists.newArrayList();
-
     class DiffingObserver implements Observer, Runnable, NodeStateDiff {
 
         final List<DocumentNodeState> incomingRootStates1 = Lists.newArrayList();
@@ -126,7 +111,6 @@ public class JournalTest {
                             incomingRootStates1.wait();
                         } catch (InterruptedException e) {
                             // ignore
-                            continue;
                         }
                     }
                     newRoot = incomingRootStates1.remove(0);
@@ -237,7 +221,7 @@ public class JournalTest {
         observer.clear();
         countingDocStore1.resetCounters();
         countingDocStore2.resetCounters();
-        countingDocStore1.printStacks = true;
+        // countingDocStore1.printStacks = true;
         countingDiffCache1.resetLoadCounter();
         countingDiffCache2.resetLoadCounter();
 
@@ -257,7 +241,7 @@ public class JournalTest {
         assertEquals(0, countingDiffCache1.getLoadCount());
         
         // let node 1 read those changes
-        System.err.println("run background ops");
+        // System.err.println("run background ops");
         ns1.runBackgroundOperations();
         mk2.commit("/", "+\"regular5\": {}", null, null);
         ns2.runBackgroundOperations();
@@ -329,7 +313,7 @@ public class JournalTest {
         doLastRevRecoveryJournalTest(true);
     }
     
-    void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception {
+    private void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception {
         DocumentMK mk1 = createMK(0 /*clusterId via clusterNodes collection*/, 0);
         DocumentNodeStore ds1 = mk1.getNodeStore();
         int c1Id = ds1.getClusterId();
@@ -405,8 +389,7 @@ public class JournalTest {
             
             // just some no-ops:
             recovery.recover(c2Id);
-            List<NodeDocument> emptyList = new LinkedList<NodeDocument>();
-            recovery.recover(emptyList.iterator(), c2Id);
+            recovery.recover(Iterators.<NodeDocument>emptyIterator(), c2Id);
             assertJournalEntries(ds1, "{}", change1); // unchanged
             assertJournalEntries(ds2, "{}", change2, change2b);
 
@@ -417,8 +400,8 @@ public class JournalTest {
             final CountDownLatch ready = new CountDownLatch(NUM_THREADS);
             final CountDownLatch start = new CountDownLatch(1);
             final CountDownLatch end = new CountDownLatch(NUM_THREADS);
-            for(int i=0; i<NUM_THREADS; i++) {
-                final List<Throwable> throwables = new LinkedList<Throwable>();
+            final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>());
+            for (int i = 0; i < NUM_THREADS; i++) {
                 Thread th = new Thread(new Runnable() {
     
                     @Override
@@ -427,10 +410,8 @@ public class JournalTest {
                             ready.countDown();
                             start.await();
                             recovery.recover(Iterators.forArray(x1,z1), c2Id);
-                        } catch (Throwable e) {
-                            synchronized(throwables) {
-                                throwables.add(e);
-                            }
+                        } catch (Exception e) {
+                            exceptions.add(e);
                         } finally {
                             end.countDown();
                         }
@@ -444,118 +425,19 @@ public class JournalTest {
             assertTrue(end.await(20, TimeUnit.SECONDS));
             assertJournalEntries(ds1, "{}", change1); // unchanged
             assertJournalEntries(ds2, "{}", change2, change2b);
-        }
-    }
-
-    void assertJournalEntries(DocumentNodeStore ds, String... expectedChanges) {
-        List<String> exp = new LinkedList<String>(Arrays.asList(expectedChanges));
-        for(boolean branch : new Boolean[]{false, true}) {
-            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
-            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
-            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, expectedChanges.length+5);
-            if (entries.size()>0) {
-                for (Iterator<JournalEntry> it = entries.iterator(); it.hasNext();) {
-                    JournalEntry journalEntry = it.next();
-                    if (!exp.remove(journalEntry.get("_c"))) {
-                        fail("Found an unexpected change: "+journalEntry.get("_c")+", while all I expected was: "+expectedChanges);
-                    }
-                }
+            for (Exception ex : exceptions) {
+                throw ex;
             }
         }
-        if (exp.size()>0) {
-            fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+expectedChanges+")");
-        }
-    }
-
-    int countJournalEntries(DocumentNodeStore ds, int max) {
-        int total = 0;
-        for(boolean branch : new Boolean[]{false, true}) {
-            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
-            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
-            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, max);
-            total+=entries.size();
-        }
-        return total;
-    }
-    
-    private NodeDocument getDocument(DocumentNodeStore nodeStore, String path) {
-        return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path));
-    }
-
-    @Before
-    @After
-    public void clear() {
-        for (DocumentMK mk : mks) {
-            mk.dispose();
-        }
-        mks.clear();
-        if (MONGO_DB) {
-            DB db = MongoUtils.getConnection().getDB();
-            MongoUtils.dropCollections(db);
-        }
-    }
-
-    private final class TestBuilder extends DocumentMK.Builder {
-        private CountingDocumentStore actualStore;
-        private CountingTieredDiffCache actualDiffCache;
-
-        @Override
-        public DocumentStore getDocumentStore() {
-            if (actualStore==null) {
-                actualStore = new CountingDocumentStore(super.getDocumentStore());
-            }
-            return actualStore;
-        }
-        
-        @Override
-        public DiffCache getDiffCache() {
-            if (actualDiffCache==null) {
-                actualDiffCache = new CountingTieredDiffCache(this);
-            }
-            return actualDiffCache;
-        }
     }
 
     private DocumentMK createMK(int clusterId, int asyncDelay) {
-        if (MONGO_DB) {
-            DB db = MongoUtils.getConnection(/*"oak-observation"*/).getDB();
-            builder = newDocumentMKBuilder();
-            return register(builder.setMongoDB(db)
-                    .setClusterId(clusterId).setAsyncDelay(asyncDelay).open());
-        } else {
-            if (ds == null) {
-                ds = new MemoryDocumentStore();
-            }
-            if (bs == null) {
-                bs = new MemoryBlobStore();
-            }
-            return createMK(clusterId, asyncDelay, ds, bs);
+        if (ds == null) {
+            ds = new MemoryDocumentStore();
         }
-    }
-    
-    private TestBuilder newDocumentMKBuilder() {
-        return new TestBuilder();
-    }
-
-    private DocumentMK createMK(int clusterId, int asyncDelay,
-                             DocumentStore ds, BlobStore bs) {
-        builder = newDocumentMKBuilder();
-        return register(builder.setDocumentStore(ds)
-                .setBlobStore(bs).setClusterId(clusterId)
-                .setAsyncDelay(asyncDelay).open());
-    }
-
-    private DocumentMK register(DocumentMK mk) {
-        mks.add(mk);
-        return mk;
-    }
-
-    private void disposeMK(DocumentMK mk) {
-        mk.dispose();
-        for (int i = 0; i < mks.size(); i++) {
-            if (mks.get(i) == mk) {
-                mks.remove(i);
-            }
+        if (bs == null) {
+            bs = new MemoryBlobStore();
         }
+        return createMK(clusterId, asyncDelay, ds, bs);
     }
 }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java Wed Jul  1 13:37:35 2015
@@ -88,7 +88,7 @@ public class MongoDiffCacheTest {
         
         MongoDiffCache diffCache = new MongoDiffCache(db, 32, new DocumentMK.Builder());
         DiffCache.Entry entry = diffCache.newEntry(
-                new Revision(1, 0, 1), new Revision(2, 0, 1));
+                new Revision(1, 0, 1), new Revision(2, 0, 1), false);
         for (int i = 0; i < 100; i++) {
             for (int j = 0; j < 100; j++) {
                 for (int k = 0; k < 64; k++) {

Modified: jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Wed Jul  1 13:37:35 2015
@@ -60,6 +60,7 @@ import org.apache.jackrabbit.oak.spi.whi
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
+import org.apache.jackrabbit.oak.util.PerfLogger;
 import org.apache.jackrabbit.stats.TimeSeriesMax;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +74,8 @@ import org.slf4j.LoggerFactory;
  */
 class ChangeProcessor implements Observer {
     private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
+    private static final PerfLogger PERF_LOGGER = new PerfLogger(
+            LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
 
     /**
      * Fill ratio of the revision queue at which commits should be delayed
@@ -289,6 +292,7 @@ class ChangeProcessor implements Observe
     public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
         if (previousRoot != null) {
             try {
+                long start = PERF_LOGGER.start();
                 FilterProvider provider = filterProvider.get();
                 // FIXME don't rely on toString for session id
                 if (provider.includeCommit(contentSession.toString(), info)) {
@@ -306,6 +310,9 @@ class ChangeProcessor implements Observe
                         }
                     }
                 }
+                PERF_LOGGER.end(start, 100,
+                        "Generated events (before: {}, after: {})",
+                        previousRoot, root);
             } catch (Exception e) {
                 LOG.warn("Error while dispatching observation events for " + tracker, e);
             }

Modified: jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java Wed Jul  1 13:37:35 2015
@@ -26,6 +26,7 @@ import static javax.jcr.observation.Even
 import static javax.jcr.observation.Event.PROPERTY_ADDED;
 import static javax.jcr.observation.Event.PROPERTY_CHANGED;
 import static javax.jcr.observation.Event.PROPERTY_REMOVED;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.getServices;
 
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -51,13 +52,12 @@ import com.google.common.collect.Lists;
 
 import org.apache.jackrabbit.commons.JcrUtils;
 import org.apache.jackrabbit.oak.Oak;
-import org.apache.jackrabbit.oak.api.jmx.RepositoryStatsMBean;
 import org.apache.jackrabbit.oak.fixture.JcrCreator;
 import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
 import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
 import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
-import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 
 public class ObservationTest extends Benchmark {
     public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | NODE_MOVED |
@@ -67,7 +67,7 @@ public class ObservationTest extends Ben
     private static final int OUTPUT_RESOLUTION = 100;
     private static final int LISTENER_COUNT = Integer.getInteger("listenerCount", 100);
     private static final int WRITER_COUNT = Integer.getInteger("writerCount", 1);
-    private static final String PATH_FILTER = System.getProperty("pathFilter", "/");
+    private static final String PATH_FILTER = System.getProperty("pathFilter");
 
     @Override
     public void run(Iterable<RepositoryFixture> fixtures) {
@@ -119,13 +119,14 @@ public class ObservationTest extends Ben
         final AtomicInteger eventCount = new AtomicInteger();
         final AtomicInteger nodeCount = new AtomicInteger();
 
-        Session[] sessions = new Session[LISTENER_COUNT];
-        EventListener[] listeners = new Listener[LISTENER_COUNT];
+        List<Session> sessions = Lists.newArrayList();
+        List<EventListener> listeners = Lists.newArrayList();
 
         List<String> testPaths = Lists.newArrayList();
         Session s = createSession(repository);
+        String path = "/path/to/observation/benchmark-" + AbstractTest.TEST_ID;
         try {
-            Node testRoot = s.getRootNode().addNode("path").addNode("to").addNode("observation").addNode("benchmark");
+            Node testRoot = JcrUtils.getOrCreateByPath(path, null, s);
             for (int i = 0; i < WRITER_COUNT; i++) {
                 testPaths.add(testRoot.addNode("session-" + i).getPath());
             }
@@ -134,14 +135,18 @@ public class ObservationTest extends Ben
             s.logout();
         }
 
+        String pathFilter = PATH_FILTER == null ? path : PATH_FILTER;
+        System.out.println("Path filter for event listener: " + pathFilter);
         ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT);
         try {
             for (int k = 0; k < LISTENER_COUNT; k++) {
-                sessions[k] = createSession(repository);
-                listeners[k] = new Listener(eventCount);
-                ObservationManager obsMgr = sessions[k].getWorkspace().getObservationManager();
-                obsMgr.addEventListener(listeners[k], EVENT_TYPES, PATH_FILTER, true, null, null, false);
+                sessions.add(createSession(repository));
+                listeners.add(new Listener(eventCount));
+                ObservationManager obsMgr = sessions.get(k).getWorkspace().getObservationManager();
+                obsMgr.addEventListener(listeners.get(k), EVENT_TYPES, pathFilter, true, null, null, false);
             }
+            // also add a listener on the root node
+            addRootListener(repository, sessions, listeners);
 
             List<Future<Object>> createNodes = Lists.newArrayList();
             for (final String p : testPaths) {
@@ -155,7 +160,7 @@ public class ObservationTest extends Ben
                             Node testRoot = session.getNode(p);
                             createChildren(testRoot, 100);
                             for (Node m : JcrUtils.getChildNodes(testRoot)) {
-                                createChildren(m, 100);
+                                createChildren(m, 100 / WRITER_COUNT);
                                 for (Node n : JcrUtils.getChildNodes(m)) {
                                     createChildren(n, 5);
                                 }
@@ -180,7 +185,7 @@ public class ObservationTest extends Ben
                 }));
             }
 
-            System.out.println("ms      #node   nodes/s #event  event/s event ratio queue");
+            System.out.println("ms      #node   nodes/s #event  event/s event-ratio queue external");
             while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT < nodeCount.get() * EVENTS_PER_NODE)) {
                 long t0 = System.currentTimeMillis();
                 Thread.sleep(OUTPUT_RESOLUTION);
@@ -188,38 +193,51 @@ public class ObservationTest extends Ben
 
                 int nc = nodeCount.get();
                 int ec = eventCount.get() / LISTENER_COUNT;
-                long ql = getObservationQueueMaxLength(whiteboard);
+                int[] ql = getObservationQueueLength(whiteboard);
 
                 double nps = (double) nc / t * 1000;
                 double eps = (double) ec / t * 1000;
                 double epn = (double) ec / nc / EVENTS_PER_NODE;
 
                 System.out.format(
-                        "%7d %7d %7.1f %7d %7.1f %1.2f %7d%n",
-                           t, nc,  nps, ec,  eps,  epn, ql);
+                        "%7d %7d %7.1f %7d %7.1f %7.2f %7d %7d%n",
+                           t, nc,  nps, ec,  eps,  epn, ql[0], ql[1]);
             }
             get(createNodes);
         } finally {
-            for (int k = 0; k < LISTENER_COUNT; k++) {
-                sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]);
-                sessions[k].logout();
+            for (int k = 0; k < sessions.size(); k++) {
+                sessions.get(k).getWorkspace().getObservationManager()
+                        .removeEventListener(listeners.get(k));
+                sessions.get(k).logout();
             }
             service.shutdown();
             service.awaitTermination(1, TimeUnit.MINUTES);
         }
     }
 
-    private static long getObservationQueueMaxLength(@Nullable Whiteboard whiteboard) {
-        if (whiteboard == null) {
-            return -1;
-        }
-        List<RepositoryStatsMBean> stats = WhiteboardUtils.getServices(
-                whiteboard, RepositoryStatsMBean.class);
-        for (RepositoryStatsMBean bean : stats) {
-            long[] values = (long[]) bean.getObservationQueueMaxLength().get("per second");
-            return values[values.length - 1];
+    private void addRootListener(Repository repository,
+                                 List<Session> sessions,
+                                 List<EventListener> listeners)
+            throws RepositoryException {
+        Session s = createSession(repository);
+        sessions.add(s);
+        Listener listener = new Listener(new AtomicInteger());
+        ObservationManager obsMgr = s.getWorkspace().getObservationManager();
+        obsMgr.addEventListener(listener, EVENT_TYPES, "/", true, null, null, false);
+        listeners.add(listener);
+    }
+
+    private static int[] getObservationQueueLength(@Nullable Whiteboard wb) {
+        if (wb == null) {
+            return new int[]{-1, -1};
+        }
+        int len = -1;
+        int ext = -1;
+        for (BackgroundObserverMBean bean : getServices(wb, BackgroundObserverMBean.class)) {
+            len = Math.max(bean.getQueueSize(), len);
+            ext = Math.max(bean.getExternalEventCount(), ext);
         }
-        return -1;
+        return new int[]{len, ext};
     }
 
     private static boolean isDone(Iterable<Future<Object>> futures) {