You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2013/11/15 07:49:29 UTC

svn commit: r1542182 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/mongomk/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/

Author: chetanm
Date: Fri Nov 15 06:49:28 2013
New Revision: 1542182

URL: http://svn.apache.org/r1542182
Log:
OAK-1156 - Improve the document cache invalidation logic to selectivly invalidate doc

Initial implementation of the cache invalidation logic. Various invalidation logic details
are provided in bug notes

Changes done

1. Add 3 strategies for invalidation as part of CacheInvalidator
2. Commit - Made the operations as LinkedHasMap so as to maintain the order of operation. This
   ensures that parent node gets created before the child node. Also it ensures that NodeDocument
   creation time for child node is greater than parent node
3. NodeDocument - Added lastCheck time field which is used to track when this document was checked
   for consistency wrt backend
4. pom.xml - Update the Guava dependency to 15 so as to make use of the TreeTraversor support

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java?rev=1542182&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java Fri Nov 15 06:49:28 2013
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.TreeTraverser;
+import com.mongodb.*;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+abstract class CacheInvalidator {
+    static final Logger LOG = LoggerFactory.getLogger(HierarchicalInvalidator.class);
+
+    public abstract InvalidationResult invalidateCache();
+
+    public static CacheInvalidator createHierarchicalInvalidator(MongoDocumentStore documentStore) {
+        return new HierarchicalInvalidator(documentStore);
+    }
+
+    public static CacheInvalidator createLinearInvalidator(MongoDocumentStore documentStore) {
+        return new LinearInvalidator(documentStore);
+    }
+
+    public static CacheInvalidator createSimpleInvalidator(MongoDocumentStore documentStore) {
+        return new SimpleInvalidator(documentStore);
+    }
+
+    static class InvalidationResult {
+        int invalidationCount;
+        int uptodateCount;
+        int cacheSize;
+        long timeTaken;
+        int queryCount;
+        int cacheEntriesProcessedCount;
+
+        @Override
+        public String toString() {
+            return "InvalidationResult{" +
+                    "invalidationCount=" + invalidationCount +
+                    ", uptodateCount=" + uptodateCount +
+                    ", cacheSize=" + cacheSize +
+                    ", timeTaken=" + timeTaken +
+                    ", queryCount=" + queryCount +
+                    ", cacheEntriesProcessedCount=" + cacheEntriesProcessedCount +
+                    '}';
+        }
+    }
+
+    private static class SimpleInvalidator extends CacheInvalidator {
+        private final MongoDocumentStore documentStore;
+
+        private SimpleInvalidator(MongoDocumentStore documentStore) {
+            this.documentStore = documentStore;
+        }
+
+        @Override
+        public InvalidationResult invalidateCache() {
+            InvalidationResult result = new InvalidationResult();
+            Map<String, NodeDocument> cacheMap = documentStore.getCache();
+            result.cacheSize = cacheMap.size();
+            for (String key : cacheMap.keySet()) {
+                documentStore.invalidateCache(Collection.NODES, key);
+            }
+            return result;
+        }
+    }
+
+    private static class LinearInvalidator extends CacheInvalidator {
+        private final DBCollection nodes;
+        private final MongoDocumentStore documentStore;
+
+        public LinearInvalidator(MongoDocumentStore documentStore) {
+            this.documentStore = documentStore;
+            this.nodes = documentStore.getDBCollection(Collection.NODES);
+        }
+
+        @Override
+        public InvalidationResult invalidateCache() {
+            final Map<String, NodeDocument> cacheMap = documentStore.getCache();
+            final InvalidationResult result = new InvalidationResult();
+            result.cacheSize = cacheMap.size();
+
+            QueryBuilder query = QueryBuilder.start(Document.ID)
+                    .in(cacheMap.keySet());
+
+            //Fetch only the lastRev map and id
+            final BasicDBObject keys = new BasicDBObject(NodeDocument.ID, 1);
+            keys.put(NodeDocument.MOD_COUNT, 1);
+
+            //Fetch lastRev for each such node
+            DBCursor cursor = nodes.find(query.get(), keys);
+            result.queryCount++;
+            for (DBObject obj : cursor) {
+                result.cacheEntriesProcessedCount++;
+                String id = (String) obj.get(NodeDocument.ID);
+                Number modCount = (Number) obj.get(NodeDocument.MOD_COUNT);
+
+                NodeDocument cachedDoc = documentStore.getIfCached(Collection.NODES, id);
+                if (cachedDoc != null
+                        && !Objects.equal(cachedDoc.getModCount(), modCount)) {
+                    documentStore.invalidateCache(Collection.NODES, id);
+                    result.invalidationCount++;
+                } else {
+                    result.uptodateCount++;
+                }
+            }
+            return result;
+        }
+    }
+
+
+    private static class HierarchicalInvalidator extends CacheInvalidator {
+        private final DBCollection nodes;
+        private final MongoDocumentStore documentStore;
+
+        public HierarchicalInvalidator(MongoDocumentStore documentStore) {
+            this.documentStore = documentStore;
+            this.nodes = documentStore.getDBCollection(Collection.NODES);
+        }
+
+        @Override
+        public InvalidationResult invalidateCache() {
+            final InvalidationResult result = new InvalidationResult();
+            Map<String, NodeDocument> cacheMap = documentStore.getCache();
+            TreeNode root = constructTreeFromPaths(cacheMap.keySet());
+
+            //Invalidation stats
+            result.cacheSize = cacheMap.size();
+
+            //Time at which the check is started. All NodeDocuments which
+            //are found to be uptodate would be marked touched at this time
+            final long startTime = System.currentTimeMillis();
+
+            Iterator<TreeNode> treeItr = TRAVERSER.breadthFirstTraversal(root).iterator();
+            PeekingIterator<TreeNode> pitr = Iterators.peekingIterator(treeItr);
+            Map<String, TreeNode> sameLevelNodes = Maps.newHashMap();
+
+            //Fetch only the lastRev map and id
+            final BasicDBObject keys = new BasicDBObject(NodeDocument.ID, 1);
+            keys.put(NodeDocument.LAST_REV, 1);
+            keys.put(NodeDocument.MOD_COUNT, 1);
+
+            while (pitr.hasNext()) {
+                final TreeNode tn = pitr.next();
+
+                //Root node would already have been processed
+                if(tn.isRoot()){
+                    tn.markUptodate(startTime);
+                    continue;
+                }
+
+                //Collect nodes at same level in tree if
+                //they are not uptodate.
+                if (tn.isUptodate(startTime)) {
+                    result.uptodateCount++;
+                } else {
+                    sameLevelNodes.put(tn.getId(), tn);
+                }
+
+                final boolean hasMore = pitr.hasNext();
+
+                //Change in level or last element
+                if ((hasMore && tn.level() != pitr.peek().level())
+                        ||
+                        (!hasMore && !sameLevelNodes.isEmpty())) {
+
+                    QueryBuilder query = QueryBuilder.start(Document.ID)
+                            .in(sameLevelNodes.keySet());
+
+                    //Fetch lastRev and modCount for each such nodes
+                    DBCursor cursor = nodes.find(query.get(), keys);
+                    result.queryCount++;
+                    for (DBObject obj : cursor) {
+
+                        result.cacheEntriesProcessedCount++;
+
+                        //Note that this is a partial document
+                        NodeDocument latestDoc = documentStore.convertFromDBObject(Collection.NODES, obj);
+
+                        final TreeNode tn2 = sameLevelNodes.get(latestDoc.getId());
+                        NodeDocument cachedDoc = tn2.getDocument();
+                        if (cachedDoc != null) {
+                            if (noChangeInRevision(latestDoc, cachedDoc)) {
+                                result.uptodateCount++;
+                                tn2.markUptodate(startTime);
+                            } else {
+                                result.invalidationCount++;
+                                tn2.invalidate();
+                            }
+                        }
+
+                        //Remove the processed nodes
+                        sameLevelNodes.remove(tn2.getId());
+                    }
+
+                    //NodeDocument present in cache but not in database
+                    //Remove such nodes from cache
+                    if (!sameLevelNodes.isEmpty()) {
+                        for (TreeNode leftOverNodes : sameLevelNodes.values()) {
+                            leftOverNodes.invalidate();
+                        }
+                    }
+
+                    sameLevelNodes.clear();
+                }
+            }
+
+            result.timeTaken = System.currentTimeMillis() - startTime;
+            LOG.info("Cache invalidation details - {}", result);
+
+            //TODO collect the list of ids which are invalidated such that entries for only those
+            //ids are removed from the Document Children Cache
+
+            return result;
+        }
+
+        private boolean noChangeInRevision(NodeDocument latestDoc, NodeDocument cachedDoc) {
+            if (Objects.equal(latestDoc.getModCount(), cachedDoc.getModCount())) {
+                return true;
+            }
+            //TODO This is a brute force check. Check if we need to compare for rev which are
+            // visible to current cluster node only etc
+            return Objects.equal(latestDoc.getLastRev(), cachedDoc.getLastRev());
+        }
+
+        private TreeNode constructTreeFromPaths(Set<String> ids) {
+            TreeNode root = new TreeNode("");
+            for (String id : ids) {
+                TreeNode current = root;
+                String path = Utils.getPathFromId(id);
+                for (String name : PathUtils.elements(path)) {
+                    current = current.child(name);
+                }
+            }
+            return root;
+        }
+
+        private static TreeTraverser<TreeNode> TRAVERSER = new TreeTraverser<TreeNode>() {
+            @Override
+            public Iterable<TreeNode> children(TreeNode root) {
+                return root.children();
+            }
+        };
+
+
+        private class TreeNode {
+            private final String name;
+            private final TreeNode parent;
+            private final String id;
+
+            private final Map<String, TreeNode> children = new HashMap<String, TreeNode>();
+
+            public TreeNode(String name) {
+                this(null, name);
+            }
+
+            public TreeNode(TreeNode parent, String name) {
+                this.name = name;
+                this.parent = parent;
+                this.id = Utils.getIdFromPath(getPath());
+            }
+
+            public TreeNode child(String name) {
+                TreeNode child = children.get(name);
+                if (child == null) {
+                    child = new TreeNode(this, name);
+                    children.put(name, child);
+                }
+                return child;
+            }
+
+            public Iterable<TreeNode> children() {
+                return children.values();
+            }
+
+            public String getId() {
+                return id;
+            }
+
+            public int level() {
+                return Utils.pathDepth(getPath());
+            }
+
+            public TreeNode getParent() {
+                return parent;
+            }
+
+            public boolean isRoot() {
+                return name.isEmpty();
+            }
+
+            public String getPath() {
+                if (isRoot()) {
+                    return "/";
+                } else {
+                    StringBuilder sb = new StringBuilder();
+                    buildPath(sb);
+                    return sb.toString();
+                }
+            }
+
+            public void invalidate() {
+                LOG.debug("Change detected for {}. Invalidating the cached entry", getId());
+                documentStore.invalidateCache(Collection.NODES, getId());
+            }
+
+            public NodeDocument getDocument() {
+                return documentStore.getIfCached(Collection.NODES, id);
+            }
+
+            public boolean isUptodate(long time) {
+                NodeDocument doc = documentStore.getIfCached(Collection.NODES, id);
+                if (doc != null) {
+                    return doc.isUptodate(time);
+                } else {
+                    //If doc is not present in cache then its already
+                    //uptodate i.e. no further consistency check required
+                    //for this document
+                    return true;
+                }
+            }
+
+            public void markUptodate(long cacheCheckTime) {
+                NodeDocument doc = getDocument();
+                if (doc == null) {
+                    return;
+                }
+                markUptodate(cacheCheckTime, doc);
+            }
+
+            @Override
+            public String toString() {
+                return id;
+            }
+
+            private void markUptodate(long cacheCheckTime, NodeDocument uptodateRoot) {
+                for (TreeNode tn : children.values()) {
+                    tn.markUptodate(cacheCheckTime, uptodateRoot);
+                }
+                ///Update the parent after child
+                markUptodate(getId(), cacheCheckTime, uptodateRoot);
+            }
+
+            private void markUptodate(String key, long time, NodeDocument uptodateRoot) {
+                //TODO Should this be done under lock
+//            Lock lock = getAndLock(key);
+//            try {
+                NodeDocument doc = documentStore.getIfCached(Collection.NODES, key);
+
+                if (doc == null) {
+                    return;
+                }
+                //Only mark the cachedDoc uptodate if
+                // 1. it got created i.e. cached document creation
+                //    time is greater or same as the time of the root node on which markUptodate
+                //    is invoked. As in typical cache population child node would be added
+                //    later than the parent.
+                //    If the creation time is less then it means that parent got replaced/updated later
+                //    and hence its _lastRev property would not truly reflect the state of child nodes
+                //    present in cache
+                // 2. OR Check if both documents have been marked uptodate in last cycle. As in that case
+                //    previous cycle would have done the required checks
+
+                if (doc.getCreated() >= uptodateRoot.getCreated()
+                        || doc.getLastCheckTime() == uptodateRoot.getLastCheckTime()) {
+                    doc.markUptodate(time);
+                }
+//            } finally {
+//                lock.unlock();
+//            }
+            }
+
+            private void buildPath(StringBuilder sb) {
+                if (!isRoot()) {
+                    getParent().buildPath(sb);
+                    sb.append('/').append(name);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1542182&r1=1542181&r2=1542182&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java Fri Nov 15 06:49:28 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugin
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -47,7 +48,7 @@ public class Commit {
     private final MongoNodeStore nodeStore;
     private final Revision baseRevision;
     private final Revision revision;
-    private HashMap<String, UpdateOp> operations = new HashMap<String, UpdateOp>();
+    private HashMap<String, UpdateOp> operations = new LinkedHashMap<String, UpdateOp>();
     private JsopWriter diff = new JsopStream();
     private List<Revision> collisions = new ArrayList<Revision>();
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java?rev=1542182&r1=1542181&r2=1542182&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java Fri Nov 15 06:49:28 2013
@@ -179,4 +179,9 @@ public class Document implements CacheVa
             return Collections.unmodifiableMap(map);
         }
     }
+
+    @Override
+    public String toString() {
+        return getId();
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java?rev=1542182&r1=1542181&r2=1542182&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java Fri Nov 15 06:49:28 2013
@@ -130,16 +130,9 @@ public class MongoDocumentStore implemen
     
     @Override
     public void invalidateCache() {
-        for (String key : nodesCache.asMap().keySet()) {
-            Lock lock = getAndLock(key);
-            try {
-                nodesCache.invalidate(key);
-            } finally {
-                lock.unlock();
-            }
-        }
+        CacheInvalidator.createHierarchicalInvalidator(this).invalidateCache();
     }
-    
+
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, String key) {
         if (collection == Collection.NODES) {
@@ -514,7 +507,7 @@ public class MongoDocumentStore implemen
     }
 
     @CheckForNull
-    private <T extends Document> T convertFromDBObject(@Nonnull Collection<T> collection,
+    <T extends Document> T convertFromDBObject(@Nonnull Collection<T> collection,
                                                        @Nullable DBObject n) {
         T copy = null;
         if (n != null) {
@@ -544,7 +537,7 @@ public class MongoDocumentStore implemen
         return map;
     }
 
-    private <T extends Document> DBCollection getDBCollection(Collection<T> collection) {
+    <T extends Document> DBCollection getDBCollection(Collection<T> collection) {
         if (collection == Collection.NODES) {
             return nodes;
         } else if (collection == Collection.CLUSTER_NODES) {
@@ -571,6 +564,10 @@ public class MongoDocumentStore implemen
         return cacheStats;
     }
 
+    Map<String,NodeDocument> getCache(){
+        return Collections.unmodifiableMap(nodesCache.asMap());
+    }
+
     private static void log(String message, Object... args) {
         if (LOG.isDebugEnabled()) {
             String argList = Arrays.toString(args);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1542182&r1=1542181&r2=1542182&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java Fri Nov 15 06:49:28 2013
@@ -129,7 +129,7 @@ public class NodeDocument extends Docume
     /**
      * The last revision. Key: machine id, value: revision.
      */
-    private static final String LAST_REV = "_lastRev";
+    static final String LAST_REV = "_lastRev";
 
     /**
      * Flag indicating that there are child nodes present. Its just used as a hint.
@@ -152,6 +152,11 @@ public class NodeDocument extends Docume
      */
     private SortedMap<Revision, Range> previous;
 
+    /**
+     * Time at which this object was check for cache consistency
+     */
+    private long lastCheckTime = System.currentTimeMillis();
+
     private final long time = System.currentTimeMillis();
 
     NodeDocument(@Nonnull DocumentStore store) {
@@ -193,6 +198,30 @@ public class NodeDocument extends Docume
     }
 
     /**
+     * Mark this instance as up-to-date wrt state in persistence store
+     * @param checkTime time at which the check was performed
+     */
+    public void markUptodate(long checkTime){
+        this.lastCheckTime = checkTime;
+    }
+
+    /**
+     * Returns true if the document has already been checked for consistency
+     * in current cycle
+     * @param lastCheckTime time at which current cycle started
+     */
+    public boolean isUptodate(long lastCheckTime){
+        return lastCheckTime <= this.lastCheckTime;
+    }
+
+    /**
+     * Returns the last time when this object was checked for consistency
+     */
+    public long getLastCheckTime() {
+        return lastCheckTime;
+    }
+
+    /**
      * @return a map of the last known revision for each clusterId.
      */
     @Nonnull

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java?rev=1542182&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java Fri Nov 15 06:49:28 2013
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.mongomk.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.mongomk.CacheInvalidator.InvalidationResult;
+import static org.junit.Assert.assertEquals;
+
+public class CacheInvalidationIT extends AbstractMongoConnectionTest {
+
+    private MongoNodeStore c1;
+    private MongoNodeStore c2;
+
+    @Before
+    public void prepareStores() throws Exception {
+        c1 = createNS(1);
+        c2 = createNS(2);
+    }
+
+    private int createScenario() throws CommitFailedException {
+        //          a
+        //        / | \
+        //       /  c  \
+        //      b       d
+        //     /|\      |
+        //    / | \     h
+        //   e  f  g
+        String[] paths = {
+                "/a",
+                "/a/c",
+                "/a/b",
+                "/a/b/e",
+                "/a/b/f",
+                "/a/b/g",
+                "/a/d",
+                "/a/d/h",
+        };
+        final int totalPaths = paths.length + 1; //1 extra for root
+        NodeBuilder root = c1.getRoot().builder();
+        createTree(root,paths);
+        c1.merge(root, EmptyHook.INSTANCE, null);
+
+        assertEquals(totalPaths,ds(c1).getCache().size());
+
+        runBgOps(c1,c2);
+        return totalPaths;
+    }
+
+    @Test
+    public void testCacheInvalidation() throws CommitFailedException {
+        final int totalPaths = createScenario();
+
+        NodeBuilder b2 = c2.getRoot().builder();
+        builder(b2,"/a/d").setProperty("foo", "bar");
+        c2.merge(b2, EmptyHook.INSTANCE, null);
+
+        //Push pending changes at /a
+        c2.runBackgroundOperations();
+
+        //Refresh the head for c1
+        c1.runBackgroundOperations();
+
+        //Only 2 entries /a and /a/d would be invalidated
+        // '/' would have been added to cache in start of backgroundRead
+        //itself
+        assertEquals(totalPaths - 2,ds(c1).getCache().size());
+    }
+
+    @Test
+    public void testCacheInvalidation_Hierarchical() throws CommitFailedException {
+        final int totalPaths = createScenario();
+
+        NodeBuilder b2 = c2.getRoot().builder();
+        builder(b2,"/a/c").setProperty("foo", "bar");
+        c2.merge(b2, EmptyHook.INSTANCE, null);
+
+        //Push pending changes at /a
+        c2.runBackgroundOperations();
+
+        //Refresh the head for c1
+        refreshHead(c1);
+
+        InvalidationResult result = CacheInvalidator.createHierarchicalInvalidator(ds(c1)).invalidateCache();
+
+        //Only 2 entries /a and /a/d would be invalidated
+        // '/' would have been added to cache in start of backgroundRead
+        //itself
+        assertEquals(2, result.invalidationCount);
+
+        //All excluding /a and /a/d would be updated
+        assertEquals(totalPaths - 2, result.uptodateCount);
+
+        //3 queries would be fired for [/] [/a] [/a/b, /a/c, /a/d]
+        assertEquals(2, result.queryCount);
+
+        //Query would only have been done for first two levels /a and /a/b, /a/c, /a/d
+        assertEquals(4, result.cacheEntriesProcessedCount);
+    }
+
+    @Test
+    public void testCacheInvalidation_Linear() throws CommitFailedException {
+        final int totalPaths = createScenario();
+
+        NodeBuilder b2 = c2.getRoot().builder();
+        builder(b2,"/a/c").setProperty("foo", "bar");
+        c2.merge(b2, EmptyHook.INSTANCE, null);
+
+        //Push pending changes at /a
+        c2.runBackgroundOperations();
+
+        //Refresh the head for c1
+        refreshHead(c1);
+
+        InvalidationResult result = CacheInvalidator.createLinearInvalidator(ds(c1)).invalidateCache();
+
+        //Only 2 entries /a and /a/d would be invalidated
+        // '/' would have been added to cache in start of backgroundRead
+        //itself
+        assertEquals(2, result.invalidationCount);
+
+        //All excluding /a and /a/d would be updated
+        assertEquals(totalPaths - 2, result.uptodateCount);
+
+        //Only one query would be fired
+        assertEquals(1, result.queryCount);
+
+        //Query would be done for all the cache entries
+        assertEquals(totalPaths, result.cacheEntriesProcessedCount);
+
+    }
+
+    private void refreshHead(MongoNodeStore store){
+        ds(store).find(Collection.NODES, Utils.getIdFromPath("/"), 0);
+    }
+
+
+    private static MongoDocumentStore ds(MongoNodeStore ns){
+        return (MongoDocumentStore) ns.getDocumentStore();
+    }
+
+    private void createTree(NodeBuilder node, String[] paths){
+        for(String path : paths){
+            createPath(node,path);
+        }
+    }
+
+    private static NodeBuilder builder(NodeBuilder builder,String path) {
+        for (String name : PathUtils.elements(path)) {
+            builder = builder.getChildNode(name);
+        }
+        return builder;
+    }
+
+    private void createPath(NodeBuilder node, String path){
+        for(String element : PathUtils.elements(path)){
+            node = node.child(element);
+        }
+    }
+
+    @After
+    public void closeStores(){
+        c1.dispose();
+        c2.dispose();
+    }
+
+    private void runBgOps(MongoNodeStore ... stores){
+        for(MongoNodeStore ns : stores){
+            ns.runBackgroundOperations();
+        }
+    }
+
+    private MongoNodeStore createNS(int clusterId) throws Exception {
+        MongoConnection mc = new MongoConnection(HOST, PORT, DB);
+        return new MongoMK.Builder()
+                          .setMongoDB(mc.getDB())
+                          .setClusterId(clusterId)
+                          .setAsyncDelay(0) //Set delay to 0 so that effect of changes are immediately reflected
+                          .getNodeStore();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java
------------------------------------------------------------------------------
    svn:eol-style = native