You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/06/29 12:59:17 UTC

svn commit: r1688179 - in /jackrabbit/oak/trunk: oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memo...

Author: mreutegg
Date: Mon Jun 29 10:59:17 2015
New Revision: 1688179

URL: http://svn.apache.org/r1688179
Log:
OAK-3002: Optimize docCache and docChildrenCache invalidation by filtering using journal

Applied slightly modified patch provided by Stefan Egli

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java
    jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java

Modified: jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java (original)
+++ jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java Mon Jun 29 10:59:17 2015
@@ -35,6 +35,7 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 import com.google.common.io.Files;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
 import org.slf4j.Logger;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * the list would be maintained in memory. If the size crosses the required threshold then
  * the sorting would be performed externally
  */
-public class StringSort implements Closeable {
+public class StringSort implements Iterable<String>, Closeable {
     private final Logger log = LoggerFactory.getLogger(getClass());
     public static final int BATCH_SIZE = 2048;
 
@@ -117,6 +118,17 @@ public class StringSort implements Close
         }
     }
 
+    @Override
+    public Iterator<String> iterator() {
+        try {
+            return getIds();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    //--------------------------< internal >------------------------------------
+
     private void addToBatch(String id) throws IOException {
         inMemBatch.add(id);
         if (inMemBatch.size() >= BATCH_SIZE) {

Modified: jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java Mon Jun 29 10:59:17 2015
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("1.0")
+@Version("1.1")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.commons.sort;
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Mon Jun 29 10:59:17 2015
@@ -31,7 +31,9 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
 import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.asStringValueIterable;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.pathToId;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.unshareString;
 
 import java.io.Closeable;
@@ -1799,9 +1801,38 @@ public final class DocumentNodeStore
 
         if (!externalChanges.isEmpty()) {
             // invalidate caches
-            stats.cacheStats = store.invalidateCache();
-            // TODO only invalidate affected items
-            docChildrenCache.invalidateAll();
+            if (externalSort == null) {
+                // if no externalSort available, then invalidate the classic way: everything
+                stats.cacheStats = store.invalidateCache();
+                docChildrenCache.invalidateAll();
+            } else {
+                try {
+                    externalSort.sort();
+                    stats.cacheStats = store.invalidateCache(pathToId(externalSort));
+                    // OAK-3002: only invalidate affected items (using journal)
+                    long origSize = docChildrenCache.size();
+                    if (origSize == 0) {
+                        // if docChildrenCache is empty, don't bother
+                        // calling invalidateAll either way 
+                        // (esp calling invalidateAll(Iterable) will
+                        // potentially iterate over all keys even though
+                        // there's nothing to be deleted)
+                        LOG.trace("backgroundRead: docChildrenCache nothing to invalidate");
+                    } else {
+                        // however, if the docChildrenCache is not empty,
+                        // use the invalidateAll(Iterable) variant,
+                        // passing it a Iterable<StringValue>, as that's
+                        // what is contained in the cache
+                        docChildrenCache.invalidateAll(asStringValueIterable(externalSort));
+                        long newSize = docChildrenCache.size();
+                        LOG.trace("backgroundRead: docChildrenCache invalidation result: orig: {}, new: {} ", origSize, newSize);
+                    }
+                } catch (Exception ioe) {
+                    LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+                    stats.cacheStats = store.invalidateCache();
+                    docChildrenCache.invalidateAll();
+                }
+            }
             stats.cacheInvalidationTime = clock.getTime() - time;
             time = clock.getTime();
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java Mon Jun 29 10:59:17 2015
@@ -233,6 +233,13 @@ public interface DocumentStore {
     CacheInvalidationStats invalidateCache();
 
     /**
+     * Invalidate the document cache but only with entries that match one
+     * of the keys provided.
+     */
+    @CheckForNull
+    CacheInvalidationStats invalidateCache(Iterable<String> keys);
+
+    /**
      * Invalidate the document cache for the given key.
      *
      * @param <T> the document type

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Mon Jun 29 10:59:17 2015
@@ -94,7 +94,6 @@ public final class JournalEntry extends
                         @Nonnull Revision from,
                         @Nonnull Revision to) throws IOException {
         LOG.debug("applyTo: starting for {} to {}", from, to);
-        externalSort.sort();
         // note that it is not de-duplicated yet
         LOG.debug("applyTo: sorting done.");
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Mon Jun 29 10:59:17 2015
@@ -338,6 +338,11 @@ public class MemoryDocumentStore impleme
     }
 
     @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return null;
+    }
+    
+    @Override
     public void dispose() {
         // ignore
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java Mon Jun 29 10:59:17 2015
@@ -195,7 +195,7 @@ abstract class CacheInvalidator {
             PeekingIterator<TreeNode> pitr = Iterators.peekingIterator(treeItr);
             Map<String, TreeNode> sameLevelNodes = Maps.newHashMap();
 
-            // Fetch only the lastRev map and id
+            // Fetch only the modCount and id
             final BasicDBObject keys = new BasicDBObject(Document.ID, 1);
             keys.put(Document.MOD_COUNT, 1);
 
@@ -228,7 +228,7 @@ abstract class CacheInvalidator {
                         QueryBuilder query = QueryBuilder.start(Document.ID)
                                 .in(idBatch);
 
-                        // Fetch lastRev and modCount for each such nodes
+                        // Fetch modCount for each such nodes
                         DBCursor cursor = nodes.find(query.get(), keys);
                         cursor.setReadPreference(ReadPreference.primary());
                         LOG.debug(

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Mon Jun 29 10:59:17 2015
@@ -67,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.cache.ForwardingListener;
 import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache;
 import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache;
+import org.apache.jackrabbit.oak.plugins.document.mongo.CacheInvalidator.InvalidationResult;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.stats.Clock;
@@ -287,6 +288,59 @@ public class MongoDocumentStore implemen
         //that would lead to lesser number of queries
         return CacheInvalidator.createHierarchicalInvalidator(this).invalidateCache();
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        LOG.debug("invalidateCache: start");
+        final InvalidationResult result = new InvalidationResult();
+        int size  = 0;
+
+        final Iterator<String> it = keys.iterator();
+        while(it.hasNext()) {
+            // read chunks of documents only
+            final List<String> ids = new ArrayList<String>(IN_CLAUSE_BATCH_SIZE);
+            while(it.hasNext() && ids.size() < IN_CLAUSE_BATCH_SIZE) {
+                final String id = it.next();
+                if (getCachedNodeDoc(id) != null) {
+                    // only add those that we actually do have cached
+                    ids.add(id);
+                }
+            }
+            size += ids.size();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("invalidateCache: batch size: {} of total so far {}",
+                        ids.size(), size);
+            }
+            
+            QueryBuilder query = QueryBuilder.start(Document.ID).in(ids);
+            // Fetch only the modCount and id
+            final BasicDBObject fields = new BasicDBObject(Document.ID, 1);
+            fields.put(Document.MOD_COUNT, 1);
+            
+            DBCursor cursor = nodes.find(query.get(), fields);
+            cursor.setReadPreference(ReadPreference.primary());
+            result.queryCount++;
+            
+            for (DBObject obj : cursor) {
+                result.cacheEntriesProcessedCount++;
+                String id = (String) obj.get(Document.ID);
+                Number modCount = (Number) obj.get(Document.MOD_COUNT);
+                
+                CachedNodeDocument cachedDoc = getCachedNodeDoc(id);
+                if (cachedDoc != null
+                        && !Objects.equal(cachedDoc.getModCount(), modCount)) {
+                    invalidateCache(Collection.NODES, id);
+                    result.invalidationCount++;
+                } else {
+                    result.upToDateCount++;
+                }
+            }
+        }
+
+        result.cacheSize = size;
+        LOG.trace("invalidateCache: end. total: {}", size);
+        return result;
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Mon Jun 29 10:59:17 2015
@@ -282,6 +282,12 @@ public class RDBDocumentStore implements
         }
         return null;
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        //TODO: optimize me
+        return invalidateCache();
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String id) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Mon Jun 29 10:59:17 2015
@@ -251,6 +251,17 @@ public class LoggingDocumentStoreWrapper
             throw convert(e);
         }
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        try {
+            logMethod("invalidateCache", keys);
+            return store.invalidateCache(keys);
+        } catch (Exception e) {
+            logException(e);
+            throw convert(e);
+        }
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Mon Jun 29 10:59:17 2015
@@ -107,6 +107,11 @@ public class SynchronizingDocumentStoreW
     }
 
     @Override
+    public synchronized CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return store.invalidateCache(keys);
+    }
+    
+    @Override
     public synchronized <T extends Document> void invalidateCache(Collection<T> collection, String key) {
         store.invalidateCache(collection, key);
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Mon Jun 29 10:59:17 2015
@@ -282,6 +282,18 @@ public class TimingDocumentStoreWrapper
             throw convert(e);
         }
     }
+    
+    @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        try {
+            long start = now();
+            CacheInvalidationStats result = base.invalidateCache(keys);
+            updateAndLogTimes("invalidateCache3", start, 0, 0);
+            return result;
+        } catch (Exception e) {
+            throw convert(e);
+        }
+    }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Mon Jun 29 10:59:17 2015
@@ -34,6 +34,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.AbstractIterator;
 import com.mongodb.BasicDBObject;
@@ -51,6 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
 
 /**
@@ -589,4 +591,31 @@ public class Utils {
     public static boolean isHiddenPath(@Nonnull String path) {
         return path.contains("/:");
     }
+
+    /**
+     * Transforms the given {@link Iterable} from {@link String} to
+     * {@link StringValue} elements. The {@link Iterable} must no have
+     * {@code null} values.
+     */
+    public static Iterable<StringValue> asStringValueIterable(
+            @Nonnull Iterable<String> values) {
+        return transform(values, new Function<String, StringValue>() {
+            @Override
+            public StringValue apply(String input) {
+                return new StringValue(input);
+            }
+        });
+    }
+
+    /**
+     * Transforms the given paths into ids using {@link #getIdFromPath(String)}.
+     */
+    public static Iterable<String> pathToId(@Nonnull Iterable<String> paths) {
+        return transform(paths, new Function<String, String>() {
+            @Override
+            public String apply(String input) {
+                return getIdFromPath(input);
+            }
+        });
+    }
 }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java?rev=1688179&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java Mon Jun 29 10:59:17 2015
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for journal related tests.
+ */
+public abstract class AbstractJournalTest {
+
+    protected TestBuilder builder;
+    protected List<DocumentMK> mks = Lists.newArrayList();
+    protected Random random;
+
+    @Before
+    public void setup() {
+        random = new Random();
+    }
+
+    @Before
+    @After
+    public void clear() {
+        for (DocumentMK mk : mks) {
+            mk.dispose();
+        }
+        mks.clear();
+    }
+
+    protected static void invalidateDocChildrenCache(DocumentNodeStore store) {
+        store.invalidateDocChildrenCache();
+    }
+
+    protected static void renewClusterIdLease(DocumentNodeStore store) {
+        store.renewClusterIdLease();
+    }
+
+    protected Set<String> choose(List<String> paths, int howMany) {
+        final Set<String> result = new HashSet<String>();
+        while(result.size()<howMany) {
+            result.add(paths.get(random.nextInt(paths.size())));
+        }
+        return result;
+    }
+
+    protected List<String> createRandomPaths(int depth, int avgChildrenPerLevel, int num) {
+        final Set<String> result = new HashSet<String>();
+        while(result.size()<num) {
+            result.add(createRandomPath(depth, avgChildrenPerLevel));
+        }
+        return new ArrayList<String>(result);
+    }
+
+    protected String createRandomPath(int depth, int avgChildrenPerLevel) {
+        StringBuilder sb = new StringBuilder();
+        for(int i=0; i<depth; i++) {
+            sb.append("/");
+            sb.append("r").append(random.nextInt(avgChildrenPerLevel));
+        }
+        return sb.toString();
+    }
+
+    protected void assertDocCache(DocumentNodeStore ns, boolean expected, String path) {
+        String id = Utils.getIdFromPath(path);
+        boolean exists = ns.getDocumentStore().getIfCached(Collection.NODES, id)!=null;
+        if (exists!=expected) {
+            if (expected) {
+                fail("assertDocCache: did not find in cache even though expected: "+path);
+            } else {
+                fail("assertDocCache: found in cache even though not expected: "+path);
+            }
+        }
+    }
+
+    protected void setProperty(DocumentNodeStore ns, String path, String key, String value, boolean runBgOpsAfterCreation) throws
+            CommitFailedException {
+        NodeBuilder rootBuilder = ns.getRoot().builder();
+        doGetOrCreate(rootBuilder, path).setProperty(key, value);
+        ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        if (runBgOpsAfterCreation) {
+            ns.runBackgroundOperations();
+        }
+    }
+
+    protected void getOrCreate(DocumentNodeStore ns, List<String> paths, boolean runBgOpsAfterCreation) throws CommitFailedException {
+        NodeBuilder rootBuilder = ns.getRoot().builder();
+        for(String path:paths) {
+            doGetOrCreate(rootBuilder, path);
+        }
+        ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        if (runBgOpsAfterCreation) {
+            ns.runBackgroundOperations();
+        }
+    }
+
+    protected void getOrCreate(DocumentNodeStore ns, String path, boolean runBgOpsAfterCreation) throws CommitFailedException {
+        NodeBuilder rootBuilder = ns.getRoot().builder();
+        doGetOrCreate(rootBuilder, path);
+        ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        if (runBgOpsAfterCreation) {
+            ns.runBackgroundOperations();
+        }
+    }
+
+    protected NodeBuilder doGetOrCreate(NodeBuilder builder, String path) {
+        String[] parts = path.split("/");
+        for(int i=1; i<parts.length; i++) {
+            builder = builder.child(parts[i]);
+        }
+        return builder;
+    }
+
+    protected void assertJournalEntries(DocumentNodeStore ds, String... expectedChanges) {
+        List<String> exp = new LinkedList<String>(asList(expectedChanges));
+        for(boolean branch : new Boolean[]{false, true}) {
+            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
+            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
+            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, expectedChanges.length+5);
+            if (entries.size()>0) {
+                for (JournalEntry journalEntry : entries) {
+                    if (!exp.remove(journalEntry.get("_c"))) {
+                        fail("Found an unexpected change: " + journalEntry.get("_c") + ", while all I expected was: " + asList(expectedChanges));
+                    }
+                }
+            }
+        }
+        if (exp.size()>0) {
+            fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+asList(expectedChanges)+")");
+        }
+    }
+
+    protected int countJournalEntries(DocumentNodeStore ds, int max) {
+        int total = 0;
+        for(boolean branch : new Boolean[]{false, true}) {
+            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
+            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
+            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, max);
+            total+=entries.size();
+        }
+        return total;
+    }
+
+    protected NodeDocument getDocument(DocumentNodeStore nodeStore, String path) {
+        return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path));
+    }
+
+    protected TestBuilder newDocumentMKBuilder() {
+        return new TestBuilder();
+    }
+
+    protected DocumentMK createMK(int clusterId, int asyncDelay,
+                                  DocumentStore ds, BlobStore bs) {
+        builder = newDocumentMKBuilder();
+        return register(builder.setDocumentStore(ds)
+                .setBlobStore(bs).setClusterId(clusterId)
+                .setAsyncDelay(asyncDelay).open());
+    }
+
+    protected DocumentMK register(DocumentMK mk) {
+        mks.add(mk);
+        return mk;
+    }
+
+    protected final class TestBuilder extends DocumentMK.Builder {
+        CountingDocumentStore actualStore;
+        CountingTieredDiffCache actualDiffCache;
+
+        @Override
+        public DocumentStore getDocumentStore() {
+            if (actualStore==null) {
+                actualStore = new CountingDocumentStore(super.getDocumentStore());
+            }
+            return actualStore;
+        }
+
+        @Override
+        public DiffCache getDiffCache() {
+            if (actualDiffCache==null) {
+                actualDiffCache = new CountingTieredDiffCache(this);
+            }
+            return actualDiffCache;
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Mon Jun 29 10:59:17 2015
@@ -183,6 +183,11 @@ public class CountingDocumentStore imple
     }
 
     @Override
+    public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        return delegate.invalidateCache(keys);
+    }
+
+    @Override
     public <T extends Document> void invalidateCache(Collection<T> collection,
                                                      String key) {
         delegate.invalidateCache(collection, key);

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Mon Jun 29 10:59:17 2015
@@ -60,6 +60,7 @@ import com.google.common.base.Throwables
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -103,8 +104,8 @@ public class DocumentNodeStoreTest {
         DocumentStore docStore = new MemoryDocumentStore();
         DocumentStore testStore = new TimingDocumentStoreWrapper(docStore) {
             @Override
-            public CacheInvalidationStats invalidateCache() {
-                super.invalidateCache();
+            public CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+                super.invalidateCache(keys);
                 semaphore.acquireUninterruptibly();
                 semaphore.release();
                 return null;

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Mon Jun 29 10:59:17 2015
@@ -52,6 +52,7 @@ public class JournalEntryTest {
         add(sort, paths);
         Revision from = new Revision(1, 0, 1);
         Revision to = new Revision(2, 0, 1);
+        sort.sort();
         JournalEntry.applyTo(sort, cache, from, to);
 
         for (String p : paths) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?rev=1688179&r1=1688178&r2=1688179&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Mon Jun 29 10:59:17 2015
@@ -16,24 +16,16 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.synchronizedList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
-import org.apache.jackrabbit.oak.plugins.document.util.Utils;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -41,27 +33,18 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
-import org.apache.jackrabbit.oak.stats.Clock;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.mongodb.DB;
-
-public class JournalTest {
+import static java.util.Collections.synchronizedList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-    private static final boolean MONGO_DB = false;
-//    private static final boolean MONGO_DB = true;
-    
-    private TestBuilder builder;
+public class JournalTest extends AbstractJournalTest {
 
     private MemoryDocumentStore ds;
     private MemoryBlobStore bs;
 
-    private List<DocumentMK> mks = Lists.newArrayList();
-
     class DiffingObserver implements Observer, Runnable, NodeStateDiff {
 
         final List<DocumentNodeState> incomingRootStates1 = Lists.newArrayList();
@@ -182,42 +165,6 @@ public class JournalTest {
     }
     
     @Test
-    public void largeCleanupTest() throws Exception {
-        // create more than DELETE_BATCH_SIZE of entries and clean them up
-        // should make sure to loop in JournalGarbageCollector.gc such
-        // that it would find issue described here:
-        // https://issues.apache.org/jira/browse/OAK-2829?focusedCommentId=14585733&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14585733
-        
-        doLargeCleanupTest(0, 100);
-        doLargeCleanupTest(200, 1000);// using offset as to not make sure to always create new entries
-        doLargeCleanupTest(2000, 10000);
-        doLargeCleanupTest(20000, 30000); // using 'size' much larger than 30k will be tremendously slow due to ordered node
-    }
-    
-    private void doLargeCleanupTest(int offset, int size) throws Exception {
-        Clock clock = new Clock.Virtual();
-        DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0);
-        DocumentNodeStore ns1 = mk1.getNodeStore();
-        // make sure we're visible and marked as active
-        ns1.renewClusterIdLease();
-        JournalGarbageCollector gc = new JournalGarbageCollector(ns1);
-        clock.getTimeIncreasing();
-        clock.getTimeIncreasing();
-        gc.gc(0, TimeUnit.MILLISECONDS); // cleanup everything that might still be there 
-        
-        // create entries as parametrized:
-        for(int i=offset; i<size+offset; i++) {
-            mk1.commit("/", "+\"regular"+i+"\": {}", null, null);
-            // always run background ops to 'flush' the change
-            // into the journal:
-            ns1.runBackgroundOperations();
-        }
-        Thread.sleep(100); // sleep 100millis
-        assertEquals(size, gc.gc(0, TimeUnit.MILLISECONDS)); // should now be able to clean up everything
-        
-    }
-    
-    @Test
     public void cleanupTest() throws Exception {
         DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0);
         DocumentNodeStore ns1 = mk1.getNodeStore();
@@ -367,7 +314,7 @@ public class JournalTest {
         doLastRevRecoveryJournalTest(true);
     }
     
-    void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception {
+    private void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception {
         DocumentMK mk1 = createMK(0 /*clusterId via clusterNodes collection*/, 0);
         DocumentNodeStore ds1 = mk1.getNodeStore();
         int c1Id = ds1.getClusterId();
@@ -485,115 +432,13 @@ public class JournalTest {
         }
     }
 
-    void assertJournalEntries(DocumentNodeStore ds, String... expectedChanges) {
-        List<String> exp = new LinkedList<String>(asList(expectedChanges));
-        for(boolean branch : new Boolean[]{false, true}) {
-            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
-            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
-            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, expectedChanges.length+5);
-            if (entries.size()>0) {
-                for (Iterator<JournalEntry> it = entries.iterator(); it.hasNext();) {
-                    JournalEntry journalEntry = it.next();
-                    if (!exp.remove(journalEntry.get("_c"))) {
-                        fail("Found an unexpected change: "+journalEntry.get("_c")+", while all I expected was: "+asList(expectedChanges));
-                    }
-                }
-            }
-        }
-        if (exp.size()>0) {
-            fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+asList(expectedChanges)+")");
-        }
-    }
-
-    int countJournalEntries(DocumentNodeStore ds, int max) {
-        int total = 0;
-        for(boolean branch : new Boolean[]{false, true}) {
-            String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
-            String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
-            List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, max);
-            total+=entries.size();
-        }
-        return total;
-    }
-    
-    private NodeDocument getDocument(DocumentNodeStore nodeStore, String path) {
-        return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path));
-    }
-
-    @Before
-    @After
-    public void clear() {
-        for (DocumentMK mk : mks) {
-            mk.dispose();
-        }
-        mks.clear();
-        if (MONGO_DB) {
-            DB db = MongoUtils.getConnection().getDB();
-            MongoUtils.dropCollections(db);
-        }
-    }
-
-    private final class TestBuilder extends DocumentMK.Builder {
-        private CountingDocumentStore actualStore;
-        private CountingTieredDiffCache actualDiffCache;
-
-        @Override
-        public DocumentStore getDocumentStore() {
-            if (actualStore==null) {
-                actualStore = new CountingDocumentStore(super.getDocumentStore());
-            }
-            return actualStore;
-        }
-        
-        @Override
-        public DiffCache getDiffCache() {
-            if (actualDiffCache==null) {
-                actualDiffCache = new CountingTieredDiffCache(this);
-            }
-            return actualDiffCache;
-        }
-    }
-
     private DocumentMK createMK(int clusterId, int asyncDelay) {
-        if (MONGO_DB) {
-            DB db = MongoUtils.getConnection(/*"oak-observation"*/).getDB();
-            builder = newDocumentMKBuilder();
-            return register(builder.setMongoDB(db)
-                    .setClusterId(clusterId).setAsyncDelay(asyncDelay).open());
-        } else {
-            if (ds == null) {
-                ds = new MemoryDocumentStore();
-            }
-            if (bs == null) {
-                bs = new MemoryBlobStore();
-            }
-            return createMK(clusterId, asyncDelay, ds, bs);
+        if (ds == null) {
+            ds = new MemoryDocumentStore();
         }
-    }
-    
-    private TestBuilder newDocumentMKBuilder() {
-        return new TestBuilder();
-    }
-
-    private DocumentMK createMK(int clusterId, int asyncDelay,
-                             DocumentStore ds, BlobStore bs) {
-        builder = newDocumentMKBuilder();
-        return register(builder.setDocumentStore(ds)
-                .setBlobStore(bs).setClusterId(clusterId)
-                .setAsyncDelay(asyncDelay).open());
-    }
-
-    private DocumentMK register(DocumentMK mk) {
-        mks.add(mk);
-        return mk;
-    }
-
-    private void disposeMK(DocumentMK mk) {
-        mk.dispose();
-        for (int i = 0; i < mks.size(); i++) {
-            if (mks.get(i) == mk) {
-                mks.remove(i);
-            }
+        if (bs == null) {
+            bs = new MemoryBlobStore();
         }
+        return createMK(clusterId, asyncDelay, ds, bs);
     }
 }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java?rev=1688179&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java Mon Jun 29 10:59:17 2015
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.mongodb.DB;
+
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.document.AbstractJournalTest;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.JournalGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class JournalIT extends AbstractJournalTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JournalIT.class);
+
+    @BeforeClass
+    public static void checkMongoDbAvailable() {
+        Assume.assumeNotNull(MongoUtils.getConnection());
+    }
+
+    @Before
+    @After
+    public void dropCollections() throws Exception {
+        MongoConnection mongoConnection = MongoUtils.getConnection();
+        MongoUtils.dropCollections(mongoConnection.getDB());
+        mongoConnection.close();
+    }
+
+    @Test
+    public void cacheInvalidationTest() throws Exception {
+        final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore();
+        final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore();
+        LOG.info("cache size 1: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+
+        // invalidate both caches under test first
+        invalidateDocChildrenCache(ns1);
+        ns1.getDocumentStore().invalidateCache();
+
+        {
+            DocumentStore s = ns1.getDocumentStore();
+            CacheStats cacheStats = s.getCacheStats();
+            LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount()));
+        }
+        LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+
+        // first create child node in instance 1
+        final List<String> paths = createRandomPaths(1, 5000000, 1000);
+        int i=0;
+        for(String path : paths) {
+            if (i++%100==0) {
+                LOG.info("at "+i);
+            }
+            getOrCreate(ns1, path, false);
+        }
+        final List<String> paths2 = createRandomPaths(20, 2345, 100);
+        getOrCreate(ns1, paths2, false);
+        ns1.runBackgroundOperations();
+        for(String path : paths) {
+            assertDocCache(ns1, true, path);
+        }
+
+        {
+            DocumentStore s = ns1.getDocumentStore();
+            CacheStats cacheStats = s.getCacheStats();
+            LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount()));
+        }
+
+        LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+        long time = System.currentTimeMillis();
+        for(int j=0; j<100; j++) {
+            long now = System.currentTimeMillis();
+            LOG.info("loop "+j+", "+(now-time)+"ms");
+            time = now;
+            final Set<String> electedPaths = choose(paths2, random.nextInt(30));
+            {
+                // choose a random few from above created paths and modify them
+                final long t1 = System.currentTimeMillis();
+                ns2.runBackgroundOperations(); // make sure ns2 has the latest from ns1
+                final long t2 = System.currentTimeMillis();
+                LOG.info("ns2 background took "+(t2-t1)+"ms");
+
+                for(String electedPath : electedPaths) {
+                    // modify /child in another instance 2
+                    setProperty(ns2, electedPath, "p", "ns2"+System.currentTimeMillis(), false);
+                }
+                final long t3 = System.currentTimeMillis();
+                LOG.info("setting props "+(t3-t2)+"ms");
+
+                ns2.runBackgroundOperations();
+                final long t4 = System.currentTimeMillis();
+                LOG.info("ns2 background took2 "+(t4-t3)+"ms");
+            }
+
+            // that should not have changed the fact that we have it cached in 'ns1'
+            for(String electedPath : electedPaths) {
+                assertDocCache(ns1, true, electedPath);
+            }
+
+            // doing a backgroundOp now should trigger invalidation
+            // which thx to the external modification will remove the entry from the cache:
+            ns1.runBackgroundOperations();
+            for(String electedPath : electedPaths) {
+                assertDocCache(ns1, false, electedPath);
+            }
+
+            // when I access it again with 'ns1', then it gets cached again:
+            for(String electedPath : electedPaths) {
+                getOrCreate(ns1, electedPath, false);
+                assertDocCache(ns1, true, electedPath);
+            }
+        }
+    }
+
+    @Test
+    public void largeCleanupTest() throws Exception {
+        // create more than DELETE_BATCH_SIZE of entries and clean them up
+        // should make sure to loop in JournalGarbageCollector.gc such
+        // that it would find issue described here:
+        // https://issues.apache.org/jira/browse/OAK-2829?focusedCommentId=14585733&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14585733
+
+        doLargeCleanupTest(0, 100);
+        doLargeCleanupTest(200, 1000);// using offset as to not make sure to always create new entries
+        doLargeCleanupTest(2000, 10000);
+        doLargeCleanupTest(20000, 30000); // using 'size' much larger than 30k will be tremendously slow due to ordered node
+    }
+
+    @Test
+    public void simpleCacheInvalidationTest() throws Exception {
+        final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore();
+        final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore();
+
+        // invalidate both caches under test first
+        invalidateDocChildrenCache(ns1);
+        ns1.getDocumentStore().invalidateCache();
+
+        // first create child node in instance 1
+        getOrCreate(ns1, "/child", true);
+        assertDocCache(ns1, true, "/child");
+
+        {
+            // modify /child in another instance 2
+            ns2.runBackgroundOperations(); // read latest changes from ns1
+            setProperty(ns2, "/child", "p", "ns2"+System.currentTimeMillis(), true);
+        }
+        // that should not have changed the fact that we have it cached in 'ns'
+        assertDocCache(ns1, true, "/child");
+
+        // doing a backgroundOp now should trigger invalidation
+        // which thx to the external modification will remove the entry from the cache:
+        ns1.runBackgroundOperations();
+        assertDocCache(ns1, false, "/child");
+
+        // when I access it again with 'ns', then it gets cached again:
+        getOrCreate(ns1, "/child", false);
+        assertDocCache(ns1, true, "/child");
+    }
+
+    private void doLargeCleanupTest(int offset, int size) throws Exception {
+        Clock clock = new Clock.Virtual();
+        DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0,
+                new MemoryDocumentStore(), new MemoryBlobStore());
+        DocumentNodeStore ns1 = mk1.getNodeStore();
+        // make sure we're visible and marked as active
+        renewClusterIdLease(ns1);
+        JournalGarbageCollector gc = new JournalGarbageCollector(ns1);
+        clock.getTimeIncreasing();
+        clock.getTimeIncreasing();
+        gc.gc(0, TimeUnit.MILLISECONDS); // cleanup everything that might still be there
+
+        // create entries as parametrized:
+        for(int i=offset; i<size+offset; i++) {
+            mk1.commit("/", "+\"regular"+i+"\": {}", null, null);
+            // always run background ops to 'flush' the change
+            // into the journal:
+            ns1.runBackgroundOperations();
+        }
+        Thread.sleep(100); // sleep 100millis
+        assertEquals(size, gc.gc(0, TimeUnit.MILLISECONDS)); // should now be able to clean up everything
+    }
+
+    protected DocumentMK createMK(int clusterId, int asyncDelay) {
+        DB db = MongoUtils.getConnection().getDB();
+        builder = newDocumentMKBuilder();
+        return register(builder.setMongoDB(db)
+                .setClusterId(clusterId).setAsyncDelay(asyncDelay).open());
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
------------------------------------------------------------------------------
    svn:eol-style = native