You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2016/07/05 11:11:13 UTC

svn commit: r1751441 [1/2] - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ ma...

Author: tomekr
Date: Tue Jul  5 11:11:13 2016
New Revision: 1751441

URL: http://svn.apache.org/viewvc?rev=1751441&view=rev
Log:
OAK-3865: New strategy to optimize secondary reads

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoMock.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Tue Jul  5 11:11:13 2016
@@ -499,6 +499,7 @@ public class DocumentMK {
         public static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16;
         private DocumentNodeStore nodeStore;
         private DocumentStore documentStore;
+        private String mongoUri;
         private DiffCache diffCache;
         private BlobStore blobStore;
         private int clusterId  = Integer.getInteger("oak.documentMK.clusterId", 0);
@@ -555,6 +556,8 @@ public class DocumentMK {
                                   @Nonnull String name,
                                   int blobCacheSizeMB)
                 throws UnknownHostException {
+            this.mongoUri = uri;
+
             DB db = new MongoConnection(uri).getDB(name);
             if (!MongoConnection.hasWriteConcern(uri)) {
                 db.setWriteConcern(MongoConnection.getDefaultWriteConcern(db));
@@ -602,6 +605,16 @@ public class DocumentMK {
         }
 
         /**
+         * Returns the Mongo URI used in the {@link #setMongoDB(String, String, int)} method.
+         *
+         * @return the Mongo URI or null if the {@link #setMongoDB(String, String, int)} method hasn't
+         * been called.
+         */
+        public String getMongoUri() {
+            return mongoUri;
+        }
+
+        /**
          * Sets a {@link DataSource} to use for the RDB document and blob
          * stores.
          *

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue Jul  5 11:11:13 2016
@@ -1042,6 +1042,11 @@ public final class NodeDocument extends
             }
         }
         n.setLastRevision(lastRevision);
+
+        if (store instanceof RevisionListener) {
+            ((RevisionListener) store).updateAccessedRevision(lastRevision);
+        }
+
         return n;
     }
 
@@ -1326,10 +1331,17 @@ public final class NodeDocument extends
     }
 
     NodeDocument getPreviousDocument(String prevId){
-        //Use the maxAge variant such that in case of Mongo call for
-        //previous doc are directed towards replicas first
         LOG.trace("get previous document {}", prevId);
-        return store.find(Collection.NODES, prevId, Integer.MAX_VALUE);
+        NodeDocument doc = store.find(Collection.NODES, prevId);
+        if (doc == null) {
+            // In case secondary read preference is used and node is not found
+            // then check with primary again as it might happen that node document has not been
+            // replicated. We know that document with such an id must exist but possibly dut to
+            // replication lag it has not reached to secondary. So in that case read again
+            // from primary
+            doc = store.find(Collection.NODES, prevId, 0);
+        }
+        return doc;
     }
 
     @Nonnull

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * Document stores implementing this interface will be informed about all revisions
+ * in which the nodes are accessed.
+ */
+public interface RevisionListener {
+
+    void updateAccessedRevision(RevisionVector revision);
+
+}

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Tue Jul  5 11:11:13 2016
@@ -61,6 +61,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.JournalEntry;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
@@ -71,6 +73,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 import org.apache.jackrabbit.oak.plugins.document.cache.ModificationStamp;
 import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.LocalChanges;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo;
 import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks;
 import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -111,7 +115,7 @@ import static org.apache.jackrabbit.oak.
 /**
  * A document store that uses MongoDB as the backend.
  */
-public class MongoDocumentStore implements DocumentStore {
+public class MongoDocumentStore implements DocumentStore, RevisionListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentStore.class);
     private static final PerfLogger PERFLOG = new PerfLogger(
@@ -142,6 +146,12 @@ public class MongoDocumentStore implemen
 
     private Clock clock = Clock.SIMPLE;
 
+    private ReplicaSetInfo replicaInfo;
+
+    private RevisionVector mostRecentAccessedRevisions;
+
+    final LocalChanges localChanges;
+
     private final long maxReplicationLagMillis;
 
     /**
@@ -173,6 +183,24 @@ public class MongoDocumentStore implemen
             Long.getLong("oak.mongo.maxQueryTimeMS", TimeUnit.MINUTES.toMillis(1));
 
     /**
+     * How often in milliseconds the MongoDocumentStore should estimate the
+     * replication lag.
+     * <p>
+     * Default is 60'000 (one minute).
+     */
+    private long estimationPullFrequencyMS =
+            Long.getLong("oak.mongo.estimationPullFrequencyMS", TimeUnit.SECONDS.toMillis(5));
+
+    /**
+     * Fallback to the old secondary-routing strategy. Setting this to true
+     * disables the optimisation introduced in the OAK-3865.
+     * <p>
+     * Default is false.
+     */
+    private boolean fallbackSecondaryStrategy =
+            Boolean.getBoolean("oak.mongo.fallbackSecondaryStrategy");
+
+    /**
      * The number of documents to put into one bulk update.
      * <p>
      * Default is 30.
@@ -213,6 +241,18 @@ public class MongoDocumentStore implemen
 
         maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
 
+        if (fallbackSecondaryStrategy) {
+            replicaInfo = null;
+            localChanges = null;
+        } else {
+            replicaInfo = new ReplicaSetInfo(clock, db, builder.getMongoUri(), estimationPullFrequencyMS, maxReplicationLagMillis, builder.getExecutor());
+            Thread replicaInfoThread = new Thread(replicaInfo, "MongoDocumentStore replica set info provider (" + builder.getClusterId() + ")");
+            replicaInfoThread.setDaemon(true);
+            replicaInfoThread.start();
+            localChanges = new LocalChanges(builder.getClusterId());
+            replicaInfo.addListener(localChanges);
+        }
+
         // indexes:
         // the _id field is the primary key, so we don't need to define it
 
@@ -473,7 +513,7 @@ public class MongoDocumentStore implemen
         boolean isSlaveOk = false;
         boolean docFound = true;
         try {
-            ReadPreference readPreference = getMongoReadPreference(collection, Utils.getParentId(key), docReadPref);
+            ReadPreference readPreference = getMongoReadPreference(collection, null, key, docReadPref);
 
             if(readPreference.isSlaveOk()){
                 LOG.trace("Routing call to secondary for fetching [{}]", key);
@@ -482,17 +522,6 @@ public class MongoDocumentStore implemen
 
             DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, readPreference);
 
-            if (obj == null
-                    && readPreference.isSlaveOk()) {
-                //In case secondary read preference is used and node is not found
-                //then check with primary again as it might happen that node document has not been
-                //replicated. This is required for case like SplitDocument where the SplitDoc is fetched with
-                //maxCacheAge == Integer.MAX_VALUE which results in readPreference of secondary.
-                //In such a case we know that document with such an id must exist
-                //but possibly dut to replication lag it has not reached to secondary. So in that case read again
-                //from primary
-                obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, ReadPreference.primary());
-            }
             if(obj == null){
                 docFound = false;
                 return null;
@@ -585,7 +614,7 @@ public class MongoDocumentStore implemen
                 cursor.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
             }
             ReadPreference readPreference =
-                    getMongoReadPreference(collection, parentId, getDefaultReadPreference(collection));
+                    getMongoReadPreference(collection, parentId, null, getDefaultReadPreference(collection));
 
             if(readPreference.isSlaveOk()){
                 isSlaveOk = true;
@@ -772,6 +801,7 @@ public class MongoDocumentStore implemen
                 if (collection == Collection.NODES) {
                     NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, updateOp);
                     nodesCache.put(newDoc);
+                    updateLocalChanges(newDoc);
                 }
                 oldDoc.seal();
             } else if (upsert) {
@@ -779,6 +809,7 @@ public class MongoDocumentStore implemen
                     NodeDocument doc = (NodeDocument) collection.newDocument(this);
                     UpdateUtils.applyChanges(doc, updateOp);
                     nodesCache.putIfAbsent(doc);
+                    updateLocalChanges(doc);
                 }
             } else {
                 // updateOp without conditions and not an upsert
@@ -941,6 +972,11 @@ public class MongoDocumentStore implemen
                         docsToCache.add(newDoc);
                     }
                 }
+
+                for (NodeDocument doc : docsToCache) {
+                    updateLocalChanges(doc);
+                }
+
                 nodesCache.putNonConflictingDocs(tracker, docsToCache);
             }
             oldDocs.keySet().removeAll(bulkResult.failedUpdates);
@@ -1106,6 +1142,7 @@ public class MongoDocumentStore implemen
                 if (collection == Collection.NODES) {
                     for (T doc : docs) {
                         nodesCache.putIfAbsent((NodeDocument) doc);
+                        updateLocalChanges((NodeDocument) doc);
                     }
                 }
                 insertSuccess = true;
@@ -1220,7 +1257,8 @@ public class MongoDocumentStore implemen
     }
 
     DocumentReadPreference getReadPreference(int maxCacheAge){
-        if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) {
+        long lag = fallbackSecondaryStrategy ? maxReplicationLagMillis : replicaInfo.getLag();
+        if(maxCacheAge >= 0 && maxCacheAge < lag) {
             return DocumentReadPreference.PRIMARY;
         } else if(maxCacheAge == Integer.MAX_VALUE){
             return DocumentReadPreference.PREFER_SECONDARY;
@@ -1233,9 +1271,10 @@ public class MongoDocumentStore implemen
         return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : DocumentReadPreference.PRIMARY;
     }
 
-    <T extends Document> ReadPreference getMongoReadPreference(Collection<T> collection,
-                                                               String parentId,
-                                                               DocumentReadPreference preference) {
+    <T extends Document> ReadPreference getMongoReadPreference(@Nonnull Collection<T> collection,
+                                                               @Nullable String parentId,
+                                                               @Nullable String documentId,
+                                                               @Nonnull DocumentReadPreference preference) {
         switch(preference){
             case PRIMARY:
                 return ReadPreference.primary();
@@ -1248,23 +1287,37 @@ public class MongoDocumentStore implemen
                     return ReadPreference.primary();
                 }
 
-                // read from primary unless parent has not been modified
-                // within replication lag period
-                ReadPreference readPreference = ReadPreference.primary();
-                if (parentId != null) {
-                    long replicationSafeLimit = getTime() - maxReplicationLagMillis;
-                    NodeDocument cachedDoc = nodesCache.getIfPresent(parentId);
-                    // FIXME: this is not quite accurate, because ancestors
+                boolean secondarySafe;
+                if (fallbackSecondaryStrategy) {
+                   // This is not quite accurate, because ancestors
                     // are updated in a background thread (_lastRev). We
                     // will need to revise this for low maxReplicationLagMillis
                     // values
-                    if (cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit)) {
+                    long replicationSafeLimit = getTime() - maxReplicationLagMillis;
 
+                    if (parentId == null) {
+                        secondarySafe = false;
+                    } else {
                         //If parent has been modified loooong time back then there children
                         //would also have not be modified. In that case we can read from secondary
-                        readPreference = getConfiguredReadPreference(collection);
+                        NodeDocument cachedDoc = nodesCache.getIfPresent(parentId);
+                        secondarySafe = cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit);
                     }
+                } else {
+                    secondarySafe = true;
+                    secondarySafe &= collection == Collection.NODES;
+                    secondarySafe &= documentId == null || !localChanges.mayContain(documentId);
+                    secondarySafe &= parentId == null || !localChanges.mayContainChildrenOf(parentId);
+                    secondarySafe &= mostRecentAccessedRevisions == null || replicaInfo.isMoreRecentThan(mostRecentAccessedRevisions);
+                }
+
+                ReadPreference readPreference;
+                if (secondarySafe) {
+                    readPreference = getConfiguredReadPreference(collection);
+                } else {
+                    readPreference = ReadPreference.primary();
                 }
+
                 return readPreference;
             default:
                 throw new IllegalArgumentException("Unsupported usage " + preference);
@@ -1339,6 +1392,9 @@ public class MongoDocumentStore implemen
 
     @Override
     public void dispose() {
+        if (replicaInfo != null) {
+            replicaInfo.stop();
+        }
         nodes.getDB().getMongo().close();
         try {
             nodesCache.close();
@@ -1534,6 +1590,14 @@ public class MongoDocumentStore implemen
         this.stats = stats;
     }
 
+    void setReplicaInfo(ReplicaSetInfo replicaInfo) {
+        if (this.replicaInfo != null) {
+            this.replicaInfo.stop();
+        }
+        this.replicaInfo = replicaInfo;
+        this.replicaInfo.addListener(localChanges);
+    }
+
     @Override
     public long determineServerTimeDifferenceMillis() {
         // the assumption is that the network delay from this instance
@@ -1580,6 +1644,25 @@ public class MongoDocumentStore implemen
         return diff;
     }
 
+    @Override
+    public synchronized void updateAccessedRevision(RevisionVector revisions) {
+        RevisionVector previousValue = mostRecentAccessedRevisions;
+        if (mostRecentAccessedRevisions == null) {
+            mostRecentAccessedRevisions = revisions;
+        } else {
+            mostRecentAccessedRevisions = mostRecentAccessedRevisions.pmax(revisions);
+        }
+        if (LOG.isDebugEnabled() && !mostRecentAccessedRevisions.equals(previousValue)) {
+            LOG.debug("Most recent accessed revisions: {}", mostRecentAccessedRevisions);
+        }
+    }
+
+    private void updateLocalChanges(NodeDocument doc) {
+        if (localChanges != null) {
+            localChanges.add(doc.getId(), Revision.getCurrentTimestamp());
+        }
+    }
+
     private static class BulkUpdateResult {
 
         private final Set<String> failedUpdates;

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+public class GetRootRevisionsCallable implements Callable<Timestamped<RevisionVector>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GetRootRevisionsCallable.class);
+
+    private final String hostName;
+
+    private final NodeCollectionProvider nodeCollections;
+
+    private final Clock clock;
+
+    public GetRootRevisionsCallable(Clock clock, String hostName, NodeCollectionProvider nodeCollections) {
+        this.hostName = hostName;
+        this.nodeCollections = nodeCollections;
+        this.clock = clock;
+    }
+
+    @Override
+    public Timestamped<RevisionVector> call() throws Exception {
+        List<Revision> revisions = new ArrayList<Revision>();
+        DBCollection collection = nodeCollections.get(hostName);
+
+        long start = clock.getTime();
+        DBObject root = collection.findOne(new BasicDBObject(Document.ID, "0:/"));
+        long end = clock.getTime();
+        long mid = (start + end) / 2;
+
+        if (root == null) {
+            LOG.warn("Can't get the root document on {}", hostName);
+            return null;
+        }
+
+        DBObject lastRev = (DBObject) root.get("_lastRev");
+        for (String clusterId : lastRev.keySet()) {
+            String rev = (String) lastRev.get(clusterId);
+            revisions.add(Revision.fromString(rev));
+        }
+        LOG.debug("Got /_lastRev from {}: {}", hostName, lastRev);
+        return new Timestamped<RevisionVector>(new RevisionVector(revisions), mid);
+    }
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class maintains a list of local changes (path+local revision timestamp), which
+ * shouldn't be read from the secondary Mongo, as we are not sure if they have
+ * been already replicated from primary. Once we get this confidence, the entry
+ * will be removed from the map.
+ */
+public class LocalChanges implements ReplicaSetInfoListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LocalChanges.class);
+
+    /**
+     * How many paths should be stored in the {@link #localChanges} map. If
+     * there's more paths added (and not removed in the
+     * {@link #gotRootRevisions(RevisionVector)}), only the latest changed
+     * revision will be remembered.
+     */
+    private static final int SIZE_LIMIT = 100;
+
+    /**
+     * This map contains document paths and timestamps in which they have been
+     * changed. Paths in this collection hasn't been replicated to secondary
+     * instances yet.
+     */
+    final Map<String, Long> localChanges = new HashMap<String, Long>();
+
+    /**
+     * If there's more than {@link #SIZE_LIMIT} paths in the
+     * {@link #localChanges}, the class will clear the above map and update this
+     * variable with the timestamp of the last local change. {@code true} will be returned
+     * for all {@link #mayContainChildrenOf(String)} and {@link #mayContain(String)}
+     * invocations until this timestamp is replicated to all secondary instances.
+     * <p>
+     * This is a safety mechanism, so the {@link #localChanges} won't grow too much.
+     */
+    private volatile long latestChange;
+
+    /**
+     * True if the current Mongo installation is an working replica. Otherwise
+     * there's no need to store the local changes.
+     */
+    private volatile boolean replicaActive;
+
+    private volatile long rootTS;
+
+    private final int clusterId;
+
+    public LocalChanges(int clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public void add(String id, long timestamp) {
+        long localRootTS = rootTS;
+        if (localRootTS != 0 && localRootTS > timestamp) {
+            return;
+        }
+
+        synchronized (this) {
+            if (latestChange != 0 && latestChange > timestamp) {
+                return;
+            }
+
+            if (replicaActive) {
+                localChanges.put(id, timestamp);
+                if (localChanges.size() >= SIZE_LIMIT) {
+                    localChanges.clear();
+                    latestChange = timestamp;
+                    LOG.debug(
+                            "The local changes count == {}. Clearing the list and switching to the 'latest change' mode: {}",
+                            SIZE_LIMIT, latestChange);
+                }
+            } else {
+                latestChange = timestamp;
+            }
+        }
+    }
+
+    /**
+     * Check if it's possible that the given document hasn't been replicated to
+     * the secondary yet.
+     *
+     * @param documentId
+     * @return {@code true} if it's possible that the document is still in the
+     *         Mongo replication queue
+     */
+    public boolean mayContain(String documentId) {
+        if (!replicaActive || latestChange != 0) {
+            return true;
+        }
+
+        synchronized (this) {
+            return localChanges.containsKey(documentId);
+        }
+    }
+
+    /**
+     * Check if it's possible that the children of the given document hasn't
+     * been replicated to the secondary yet.
+     *
+     * @param parentId
+     * @return {@code true} if it's possible that the children of given document
+     *         are still in the Mongo replication queue
+     */
+    public boolean mayContainChildrenOf(String parentId) {
+        if (!replicaActive || latestChange != 0) {
+            return true;
+        }
+
+        synchronized (this) {
+            for (String key : localChanges.keySet()) {
+                if (parentId.equals(Utils.getParentId(key))) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public void gotRootRevisions(RevisionVector rootRevision) {
+        if (rootRevision == null) {
+            return;
+        }
+
+        Revision rootRevForLocalInstance = rootRevision.getRevision(clusterId);
+        if (rootRevForLocalInstance == null) {
+            return;
+        }
+        this.rootTS = rootRevForLocalInstance.getTimestamp();
+
+        if (!replicaActive) {
+            replicaActive = true;
+            LOG.info("Replica set became active");
+        }
+
+        synchronized (this) {
+            if (latestChange != 0 && latestChange <= rootTS) {
+                latestChange = 0;
+            }
+
+            Iterator<Long> it = localChanges.values().iterator();
+            while (it.hasNext()) {
+                if (it.next() <= rootTS) {
+                    it.remove();
+                }
+            }
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static com.google.common.collect.Sets.difference;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientException;
+import com.mongodb.MongoClientURI;
+
+/**
+ * This class connects to Mongo instances and returns the NODES collection.
+ * Connections are cached.
+ */
+public class NodeCollectionProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NodeCollectionProvider.class);
+
+    private final Map<String, DBCollection> collections = new ConcurrentHashMap<String, DBCollection>();
+
+    private final String originalMongoUri;
+
+    private final String dbName;
+
+    public NodeCollectionProvider(String originalMongoUri, String dbName) {
+        this.originalMongoUri = originalMongoUri;
+        this.dbName = dbName;
+    }
+
+    public void retain(Set<String> hostNames) {
+        close(difference(collections.keySet(), hostNames));
+    }
+
+    public void close() {
+        close(collections.keySet());
+    }
+
+    private void close(Set<String> hostNames) {
+        Iterator<Entry<String, DBCollection>> it = collections.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, DBCollection> entry = it.next();
+            if (hostNames.contains(entry.getKey())) {
+                try {
+                    entry.getValue().getDB().getMongo().close();
+                    it.remove();
+                } catch (MongoClientException e) {
+                    LOG.error("Can't close Mongo client", e);
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public DBCollection get(String hostname) throws UnknownHostException {
+        if (collections.containsKey(hostname)) {
+            return collections.get(hostname);
+        }
+
+        MongoClient client;
+        if (originalMongoUri == null) {
+            MongoClientURI uri = new MongoClientURI("mongodb://" + hostname);
+            client = new MongoClient(uri);
+        } else {
+            client = prepareClientForHostname(hostname);
+        }
+
+        DB db = client.getDB(dbName);
+        db.getMongo().slaveOk();
+        DBCollection collection = db.getCollection(Collection.NODES.toString());
+        collections.put(hostname, collection);
+        return collection;
+    }
+
+    private MongoClient prepareClientForHostname(String hostname) throws UnknownHostException {
+        ServerAddress address;
+        if (hostname.contains(":")) {
+            String[] hostSplit = hostname.split(":");
+            if (hostSplit.length != 2) {
+                throw new IllegalArgumentException("Not a valid hostname: " + hostname);
+            }
+            address = new ServerAddress(hostSplit[0], Integer.parseInt(hostSplit[1]));
+        } else {
+            address = new ServerAddress(hostname);
+        }
+
+        MongoClientURI originalUri = new MongoClientURI(originalMongoUri);
+        List<MongoCredential> credentialList = new ArrayList<MongoCredential>(1);
+        if (originalUri.getCredentials() != null) {
+            credentialList.add(originalUri.getCredentials());
+        }
+        return new MongoClient(address, credentialList, originalUri.getOptions());
+    }
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static com.google.common.base.Predicates.in;
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Iterables.isEmpty;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.filterKeys;
+import static com.google.common.collect.Sets.union;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isGreaterOrEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
+
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.bson.BasicBSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.MongoException;
+import com.mongodb.ReadPreference;
+
+/**
+ * This class analyses the replica set info provided by MongoDB to find out two
+ * what's the current synchronization state of secondary instances in terms of
+ * revision values and timestamp.
+ */
+public class ReplicaSetInfo implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetInfo.class);
+
+    private final DB adminDb;
+
+    private final long pullFrequencyMillis;
+
+    private final long maxReplicationLagMillis;
+
+    private final Executor executor;
+
+    private final NodeCollectionProvider nodeCollections;
+
+    private final Clock clock;
+
+    private final Object stopMonitor = new Object();
+
+    protected final List<ReplicaSetInfoListener> listeners = new CopyOnWriteArrayList<ReplicaSetInfoListener>();
+
+    protected volatile RevisionVector rootRevisions;
+
+    volatile long secondariesSafeTimestamp;
+
+    List<String> hiddenMembers;
+
+    private volatile boolean stop;
+
+    public ReplicaSetInfo(Clock clock, DB db, String originalMongoUri, long pullFrequencyMillis, long maxReplicationLagMillis, Executor executor) {
+        this.executor = executor;
+        this.clock = clock;
+        this.adminDb = db.getSisterDB("admin");
+        this.pullFrequencyMillis = pullFrequencyMillis;
+        this.maxReplicationLagMillis = maxReplicationLagMillis;
+        this.nodeCollections = new NodeCollectionProvider(originalMongoUri, db.getName());
+    }
+
+    public void addListener(ReplicaSetInfoListener listener) {
+        listeners.add(listener);
+    }
+
+    public boolean isMoreRecentThan(RevisionVector revisions) {
+        RevisionVector localRootRevisions = rootRevisions;
+        if (localRootRevisions == null) {
+            return false;
+        } else {
+            return isGreaterOrEquals(localRootRevisions, revisions);
+        }
+    }
+
+    public long getLag() {
+        long localTS = secondariesSafeTimestamp;
+        if (localTS == 0) {
+            return maxReplicationLagMillis;
+        } else {
+            return clock.getTime() - localTS;
+        }
+    }
+
+    @Nullable
+    public RevisionVector getMinimumRootRevisions() {
+        return rootRevisions;
+    }
+
+    public void stop() {
+        synchronized (stopMonitor) {
+            stop = true;
+            stopMonitor.notify();
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            updateLoop();
+        } catch (Exception e) {
+            LOG.error("Exception in the ReplicaSetInfo thread", e);
+        }
+    }
+
+    private void updateLoop() {
+        while (!stop) {
+            if (hiddenMembers == null) {
+                hiddenMembers = getHiddenMembers();
+            } else {
+                updateReplicaStatus();
+
+                for (ReplicaSetInfoListener listener : listeners) {
+                    listener.gotRootRevisions(rootRevisions);
+                }
+            }
+
+            synchronized (stopMonitor) {
+                try {
+                    if (!stop) {
+                        stopMonitor.wait(pullFrequencyMillis);
+                    }
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        }
+        LOG.debug("Stopping the replica set info");
+        nodeCollections.close();
+    }
+
+    void updateReplicaStatus() {
+        BasicDBObject result;
+        try {
+            result = getReplicaStatus();
+        } catch (MongoException e) {
+            LOG.error("Can't get replica status", e);
+            rootRevisions = null;
+            secondariesSafeTimestamp = 0;
+            return;
+        }
+
+        @SuppressWarnings("unchecked")
+        Iterable<BasicBSONObject> members = (Iterable<BasicBSONObject>) result.get("members");
+        if (members == null) {
+            members = Collections.emptyList();
+        }
+        updateRevisions(members);
+    }
+
+    List<String> getHiddenMembers() {
+        BasicDBObject result;
+        try {
+            result = getReplicaConfig();
+        } catch (MongoException e) {
+            LOG.error("Can't get replica configuration", e);
+            return null;
+        }
+
+        @SuppressWarnings("unchecked")
+        Iterable<BasicBSONObject> members = (Iterable<BasicBSONObject>) result.get("members");
+        if (members == null) {
+            members = Collections.emptyList();
+        }
+
+        List<String> hiddenMembers = new ArrayList<String>();
+        for (BasicBSONObject member : members) {
+            if (member.getBoolean("hidden")) {
+                hiddenMembers.add(member.getString("host"));
+            }
+        }
+        return hiddenMembers;
+    }
+
+    protected BasicDBObject getReplicaConfig() {
+        return adminDb.command("replSetGetConfig", ReadPreference.primary());
+    }
+
+    protected BasicDBObject getReplicaStatus() {
+        return adminDb.command("replSetGetStatus", ReadPreference.primary());
+    }
+
+    private void updateRevisions(Iterable<BasicBSONObject> members) {
+        Set<String> secondaries = new HashSet<String>();
+        boolean unknownState = false;
+        String primary = null;
+
+        for (BasicBSONObject member : members) {
+            MemberState state;
+            try {
+                state = MemberState.valueOf(member.getString("stateStr"));
+            } catch (IllegalArgumentException e) {
+                state = MemberState.UNKNOWN;
+            }
+            String name = member.getString("name");
+            if (hiddenMembers.contains(name)) {
+                continue;
+            }
+
+            switch (state) {
+            case PRIMARY:
+                primary = name;
+                continue;
+
+            case SECONDARY:
+                secondaries.add(name);
+                break;
+
+            case ARBITER:
+                continue;
+
+            default:
+                LOG.debug("Invalid state {} for instance {}", state, name);
+                unknownState = true;
+                break;
+            }
+        }
+
+        if (secondaries.isEmpty()) {
+            LOG.debug("No secondaries found: {}", members);
+            unknownState = true;
+        }
+
+        if (primary == null) {
+            LOG.debug("No primary found: {}", members);
+            unknownState = true;
+        }
+
+        Map<String, Timestamped<RevisionVector>> vectors = null;
+        if (!unknownState) {
+            vectors = getRootRevisions(union(secondaries, of(primary)));
+            if (vectors.containsValue(null)) {
+                unknownState = true;
+            }
+        }
+
+        if (unknownState) {
+            rootRevisions = null;
+            secondariesSafeTimestamp = 0;
+        } else {
+            Timestamped<RevisionVector> primaryRevision = vectors.get(primary);
+            Iterable<Timestamped<RevisionVector>> secondaryRevisions = filterKeys(vectors, in(secondaries)).values();
+
+            rootRevisions = pmin(transform(secondaryRevisions, Timestamped.<RevisionVector>getExtractFunction()));
+            if (rootRevisions == null || primaryRevision == null || isEmpty(secondaryRevisions)) {
+                secondariesSafeTimestamp = 0;
+            } else {
+                secondariesSafeTimestamp = getSecondariesSafeTimestamp(primaryRevision, secondaryRevisions);
+            }
+        }
+
+        LOG.debug("Minimum root revisions: {}. Current lag: {}", rootRevisions, getLag());
+        nodeCollections.retain(secondaries);
+    }
+
+    /**
+     * Find the oldest revision which hasn't been replicated from primary to
+     * secondary yet and return its timestamp. If all revisions has been already
+     * replicated, return the date of the measurement.
+     *
+     * @return the point in time to which the secondary instances has been synchronized
+     */
+    private long getSecondariesSafeTimestamp(Timestamped<RevisionVector> primary, Iterable<Timestamped<RevisionVector>> secondaries) {
+        final RevisionVector priRev = primary.getValue();
+        Long oldestNotReplicated = null;
+        for (Timestamped<RevisionVector> v : secondaries) {
+            RevisionVector secRev = v.getValue();
+            if (secRev.equals(priRev)) {
+                continue;
+            }
+
+            for (Revision pr : priRev) {
+                Revision sr = secRev.getRevision(pr.getClusterId());
+                if (pr.equals(sr)) {
+                    continue;
+                }
+                if (oldestNotReplicated == null || oldestNotReplicated > pr.getTimestamp()) {
+                    oldestNotReplicated = pr.getTimestamp();
+                }
+            }
+        }
+
+        if (oldestNotReplicated == null) {
+            long minOpTimestamp = primary.getOperationTimestamp();
+            for (Timestamped<RevisionVector> v : secondaries) {
+                if (v.getOperationTimestamp() < minOpTimestamp) {
+                    minOpTimestamp = v.getOperationTimestamp();
+                }
+            }
+            return minOpTimestamp;
+        } else {
+            return oldestNotReplicated;
+        }
+    }
+
+    protected Map<String, Timestamped<RevisionVector>> getRootRevisions(Iterable<String> hosts) {
+        Map<String, Future<Timestamped<RevisionVector>>> futures = new HashMap<String, Future<Timestamped<RevisionVector>>>();
+        for (final String hostName : hosts) {
+            Callable<Timestamped<RevisionVector>> callable = new GetRootRevisionsCallable(clock, hostName, nodeCollections);
+            FutureTask<Timestamped<RevisionVector>> futureTask = new FutureTask<Timestamped<RevisionVector>>(callable);
+            futures.put(hostName, futureTask);
+            executor.execute(futureTask);
+        }
+
+        Map<String, Timestamped<RevisionVector>> result = new HashMap<String, Timestamped<RevisionVector>>();
+        for (Entry<String, Future<Timestamped<RevisionVector>>> entry : futures.entrySet()) {
+            try {
+                result.put(entry.getKey(), entry.getValue().get());
+            } catch (Exception e) {
+                LOG.error("Can't connect to the Mongo instance", e);
+            }
+        }
+        return result;
+    }
+
+    private static RevisionVector pmin(Iterable<RevisionVector> vectors) {
+        RevisionVector minimum = null;
+        for (RevisionVector v : vectors) {
+            if (v == null) {
+                return null;
+            } else if (minimum == null) {
+                minimum = v;
+            } else {
+                minimum = minimum.pmin(v);
+            }
+        }
+        return minimum;
+    }
+
+    public enum MemberState {
+        STARTUP, PRIMARY, SECONDARY, RECOVERING, STARTUP2, UNKNOWN, ARBITER, DOWN, ROLLBACK, REMOVED
+    }
+}

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+
+/**
+ * Classes implementing this interface will be informed about the current root
+ * revision states on secondary instances by {@link ReplicaSetInfo}.
+ */
+public interface ReplicaSetInfoListener {
+
+    void gotRootRevisions(RevisionVector rootRevision);
+
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import com.google.common.base.Function;
+
+/**
+ * A value with a timestamp.
+ *
+ * @param <T> the value type
+ */
+public class Timestamped<T> {
+
+    private final T value;
+
+    private final long operationTimestamp;
+
+    public Timestamped(T value, long operationTimestamp) {
+        this.value = value;
+        this.operationTimestamp = operationTimestamp;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public long getOperationTimestamp() {
+        return operationTimestamp;
+    }
+
+    public static <T> Function<Timestamped<T>, T> getExtractFunction() {
+        return new Function<Timestamped<T>, T>() {
+            @Override
+            public T apply(Timestamped<T> input) {
+                if (input == null) {
+                    return null;
+                } else {
+                    return input.value;
+                }
+            }
+        };
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder("Timestamped[").append(value).append('(').append(operationTimestamp).append(")]")
+                .toString();
+    }
+}
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java Tue Jul  5 11:11:13 2016
@@ -26,6 +26,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.Document;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
@@ -37,7 +39,7 @@ import org.apache.jackrabbit.oak.plugins
  * <p>
  * @see "https://issues.apache.org/jira/browse/OAK-2739 for more details"
  */
-public final class LeaseCheckDocumentStoreWrapper implements DocumentStore {
+public final class LeaseCheckDocumentStoreWrapper implements DocumentStore, RevisionListener {
 
     private final DocumentStore delegate;
     private final ClusterNodeInfo clusterNodeInfo;
@@ -203,4 +205,11 @@ public final class LeaseCheckDocumentSto
         return delegate.determineServerTimeDifferenceMillis();
     }
 
+    @Override
+    public void updateAccessedRevision(RevisionVector revision) {
+        if (delegate instanceof RevisionListener) {
+            ((RevisionListener) delegate).updateAccessedRevision(revision);
+        }
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Tue Jul  5 11:11:13 2016
@@ -28,6 +28,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Document;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 import org.slf4j.Logger;
@@ -36,7 +38,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Implements a <code>DocumentStore</code> wrapper and logs all calls.
  */
-public class LoggingDocumentStoreWrapper implements DocumentStore {
+public class LoggingDocumentStoreWrapper implements DocumentStore, RevisionListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(LoggingDocumentStoreWrapper.class);
 
@@ -408,4 +410,12 @@ public class LoggingDocumentStoreWrapper
         }
         LOG.info(out);
     }
+
+    @Override
+    public void updateAccessedRevision(RevisionVector revision) {
+        logMethod("updateAccessedRevision", revision);
+        if (store instanceof RevisionListener) {
+            ((RevisionListener) store).updateAccessedRevision(revision);
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Tue Jul  5 11:11:13 2016
@@ -25,6 +25,8 @@ import org.apache.jackrabbit.oak.cache.C
 import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.Document;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 
@@ -32,7 +34,7 @@ import org.apache.jackrabbit.oak.plugins
  * Implements a <code>DocumentStore</code> wrapper which synchronizes on all
  * methods.
  */
-public class SynchronizingDocumentStoreWrapper implements DocumentStore {
+public class SynchronizingDocumentStoreWrapper implements DocumentStore, RevisionListener {
 
     final DocumentStore store;
 
@@ -150,4 +152,11 @@ public class SynchronizingDocumentStoreW
     public synchronized Map<String, String> getMetadata() {
         return store.getMetadata();
     }
+
+    @Override
+    public synchronized void updateAccessedRevision(RevisionVector revision) {
+        if (store instanceof RevisionListener) {
+            ((RevisionListener) store).updateAccessedRevision(revision);
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Tue Jul  5 11:11:13 2016
@@ -34,6 +34,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Document;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 
@@ -41,7 +43,7 @@ import org.apache.jackrabbit.oak.plugins
  * A DocumentStore wrapper that can be used to log and also time DocumentStore
  * calls.
  */
-public class TimingDocumentStoreWrapper implements DocumentStore {
+public class TimingDocumentStoreWrapper implements DocumentStore, RevisionListener {
 
     private static final boolean DEBUG = Boolean.parseBoolean(System.getProperty("base.debug", "true"));
     private static final AtomicInteger NEXT_ID = new AtomicInteger();
@@ -390,6 +392,19 @@ public class TimingDocumentStoreWrapper
         }
     }
 
+    @Override
+    public void updateAccessedRevision(RevisionVector revision) {
+        try {
+            long start = now();
+            if (base instanceof RevisionListener) {
+                ((RevisionListener) base).updateAccessedRevision(revision);
+            }
+            updateAndLogTimes("updateAccessedRevision", start, 0, 0);
+        } catch (Exception e) {
+            throw convert(e);
+        }
+   }
+
     private void logCommonCall(long start, String key) {
         int time = (int) (System.currentTimeMillis() - start);
         if (time <= 0) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Tue Jul  5 11:11:13 2016
@@ -759,6 +759,21 @@ public class Utils {
     }
 
     /**
+     * Returns true if all the revisions in the {@code a} greater or equals
+     * to their counterparts in {@code b}. If {@code b} contains revisions
+     * for cluster nodes that are not present in {@code a}, return false.
+     *
+     * @param a
+     * @param b
+     * @return true if all the revisions in the {@code a} are at least
+     * as recent as their counterparts in the {@code b}
+     */
+    public static boolean isGreaterOrEquals(@Nonnull RevisionVector a,
+                                            @Nonnull RevisionVector b) {
+        return a.pmax(b).equals(a);
+    }
+
+    /**
      * Wraps the given iterable and aborts iteration over elements when the
      * predicate on an element evaluates to {@code false}.
      *

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Tue Jul  5 11:11:13 2016
@@ -27,7 +27,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 
-public class CountingDocumentStore implements DocumentStore {
+public class CountingDocumentStore implements DocumentStore, RevisionListener {
 
     private DocumentStore delegate;
 
@@ -230,4 +230,11 @@ public class CountingDocumentStore imple
     public long determineServerTimeDifferenceMillis() {
         return delegate.determineServerTimeDifferenceMillis();
     }
+
+    @Override
+    public void updateAccessedRevision(RevisionVector revision) {
+        if (delegate instanceof RevisionListener) {
+            ((RevisionListener) delegate).updateAccessedRevision(revision);
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java Tue Jul  5 11:11:13 2016
@@ -28,7 +28,7 @@ import org.apache.jackrabbit.oak.plugins
  * A DocumentStore implementation which wraps another store and delegates all
  * calls to it.
  */
-public class DocumentStoreWrapper implements DocumentStore {
+public class DocumentStoreWrapper implements DocumentStore, RevisionListener {
 
     protected final DocumentStore store;
 
@@ -164,4 +164,11 @@ public class DocumentStoreWrapper implem
     public long determineServerTimeDifferenceMillis() {
         return store.determineServerTimeDifferenceMillis();
     }
+
+    @Override
+    public void updateAccessedRevision(RevisionVector revision) {
+        if (store instanceof RevisionListener) {
+            ((RevisionListener) store).updateAccessedRevision(revision);
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java Tue Jul  5 11:11:13 2016
@@ -19,19 +19,17 @@
 
 package org.apache.jackrabbit.oak.plugins.document.mongo;
 
-import java.util.concurrent.TimeUnit;
-
 import com.mongodb.ReadPreference;
 
-import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
-import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
-import org.apache.jackrabbit.oak.plugins.document.Revision;
-import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.plugins.document.*;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfoMock;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
@@ -44,30 +42,56 @@ public class ReadPreferenceIT extends Ab
 
     private MongoDocumentStore mongoDS;
 
+    private DocumentMK mk2;
+
     private Clock clock;
 
-    private long replicationLag;
+    private ReplicaSetInfoMock replica;
+
+    private ReplicaSetInfoMock.RevisionBuilder primary;
+
+    private ReplicaSetInfoMock.RevisionBuilder secondary;
 
     @Override
     public void setUpConnection() throws Exception {
         clock = new Clock.Virtual();
-        replicationLag = TimeUnit.SECONDS.toMillis(10);
         mongoConnection = connectionFactory.getConnection();
+        replica = ReplicaSetInfoMock.create(clock);
         mk = new DocumentMK.Builder()
-                .setMaxReplicationLag(replicationLag, TimeUnit.MILLISECONDS)
-                .setMongoDB(mongoConnection.getDB())
                 .setClusterId(1)
+                .setMongoDB(mongoConnection.getDB())
                 .setLeaseCheck(false)
                 .open();
         mongoDS = (MongoDocumentStore) mk.getDocumentStore();
+
+        mk2 = new DocumentMK.Builder()
+                .setClusterId(2)
+                .setMongoDB(mongoConnection.getDB())
+                .setLeaseCheck(false)
+                .open();
+    }
+
+    @Before
+    public void createReplicaSet() {
+        replica = ReplicaSetInfoMock.create(clock);
+
+        primary = replica.addInstance(ReplicaSetInfo.MemberState.PRIMARY, "p1");
+        secondary = replica.addInstance(ReplicaSetInfo.MemberState.SECONDARY, "s1");
+        mongoDS.setReplicaInfo(replica);
     }
 
     @Test
     public void testPreferenceConversion() throws Exception{
+        primary.addRevisions(200);
+        secondary.addRevisions(0);
+        replica.updateRevisions();
+        clock.waitUntil(500);
+        assertEquals(300, replica.getLag());
+
         //For cacheAge < replicationLag result should be primary
         assertEquals(DocumentReadPreference.PRIMARY, mongoDS.getReadPreference(0));
         assertEquals(DocumentReadPreference.PRIMARY,
-                mongoDS.getReadPreference((int) (replicationLag - 100)));
+                mongoDS.getReadPreference((int) (replica.getLag() - 100)));
 
         //For Integer.MAX_VALUE it should be secondary as caller intends that value is stable
         assertEquals(DocumentReadPreference.PREFER_SECONDARY,
@@ -77,38 +101,75 @@ public class ReadPreferenceIT extends Ab
         assertEquals(DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH,
                 mongoDS.getReadPreference(-1));
         assertEquals(DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH,
-                mongoDS.getReadPreference((int) (replicationLag + 100)));
+                mongoDS.getReadPreference((int) (replica.getLag() + 100)));
     }
 
     @Test
     public void testMongoReadPreferencesDefault() throws Exception{
         assertEquals(ReadPreference.primary(),
-                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY));
+                mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PRIMARY));
 
         assertEquals(ReadPreference.primaryPreferred(),
-                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY));
+                mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_PRIMARY));
 
         //By default Mongo read preference is primary
         assertEquals(ReadPreference.primary(),
-                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+                mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY));
 
         //Change the default and assert again
         mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary());
         assertEquals(ReadPreference.secondary(),
-                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+                mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY));
 
         //for case where parent age cannot be determined the preference should be primary
         assertEquals(ReadPreference.primary(),
-                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+                mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
 
         //For collection other than NODES always primary
         assertEquals(ReadPreference.primary(),
-                mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+                mongoDS.getMongoReadPreference(SETTINGS,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
 
     }
 
     @Test
-    public void testMongoReadPreferencesWithAge() throws Exception{
+    public void testMongoReadPreferences() throws Exception {
+        ReadPreference testPref = ReadPreference.secondary();
+        mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref);
+
+        NodeStore extNodeStore = mk2.getNodeStore();
+        NodeBuilder b1 = extNodeStore.getRoot().builder();
+        b1.child("x").child("y").setProperty("xyz", "123");
+        extNodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        // wait until the change is visible
+        NodeStore nodeStore = mk.getNodeStore();
+        while (true) {
+            if (nodeStore.getRoot().hasChildNode("x")) {
+                break;
+            } else {
+                Thread.sleep(100);
+            }
+        }
+
+        // the change hasn't been replicated yet, primary must be used
+        assertEquals(ReadPreference.primary(),
+                mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+
+        // make the secondary up-to-date
+        DocumentNodeState ns = (DocumentNodeState) nodeStore.getRoot().getChildNode("x").getChildNode("y");
+        RevisionVector lastSeenRev = ns.getLastRevision().update(new Revision(Revision.getCurrentTimestamp(), 0, 1)); // add revision for the local cluster node
+
+        primary.set(lastSeenRev);
+        secondary.set(lastSeenRev);
+        replica.updateRevisions();
+
+        // change has been replicated by now, it's fine to use secondary
+        assertEquals(testPref,
+                mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+    }
+
+    @Test
+    public void testMongoReadPreferencesForLocalChanges() throws Exception {
         //Change the default
         ReadPreference testPref = ReadPreference.secondary();
         mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref);
@@ -118,21 +179,21 @@ public class ReadPreferenceIT extends Ab
         b1.child("x").child("y");
         nodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
 
-        String id = Utils.getIdFromPath("/x/y");
-        String parentId = Utils.getParentId(id);
-        mongoDS.invalidateCache(NODES,id);
+        mongoDS.invalidateCache();
 
-        //For modifiedTime < replicationLag primary must be used
+        // the local change hasn't been replicated yet, primary must be used
         assertEquals(ReadPreference.primary(),
-                mongoDS.getMongoReadPreference(NODES,parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+                mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
 
-        //Going into future to make parent /x old enough
-        clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag);
-        mongoDS.setClock(clock);
+        // make the secondary up-to-date
+        long now = Revision.getCurrentTimestamp();
+        primary.addRevision(now, 0, 1, false);
+        secondary.addRevision(now, 0, 1, false);
+        replica.updateRevisions();
 
-        //For old modified nodes secondaries should be preferred
+        // local change has been replicated by now, it's fine to use secondary
         assertEquals(testPref,
-                mongoDS.getMongoReadPreference(NODES, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+                mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
     }
 
     @Test

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java Tue Jul  5 11:11:13 2016
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.junit.Test;
+
+public class LocalChangesTest {
+
+    @Test
+    public void testReplicaInactive() {
+        LocalChanges l = new LocalChanges(0);
+        assertTrue(l.mayContain("2:/xyz/123"));
+        assertTrue(l.mayContainChildrenOf("2:/xyz/123"));
+
+        l.add("2:/xyz/123", 2); // don't remember the path, only the 2 timestamp
+        l.gotRootRevisions(revsV(1, 1, 1));
+        assertTrue(l.mayContain("2:/xyz/123"));
+        assertTrue(l.mayContain("2:/abc/567")); // we only remembered the timestamp, not the path
+
+        l.gotRootRevisions(revsV(3, 3, 3)); // the new revision >= the remembered 2
+        assertFalse(l.mayContain("2:/xyz/123"));
+        assertFalse(l.mayContain("2:/abc/567"));
+    }
+
+    @Test
+    public void testMayContain() {
+        LocalChanges l = new LocalChanges(0);
+        l.add("2:/xyz/123", 2);
+
+        l.gotRootRevisions(revsV(1, 1, 1));
+        assertTrue(l.mayContain("2:/xyz/123"));
+        assertTrue(l.mayContainChildrenOf("1:/xyz"));
+
+        l.gotRootRevisions(revsV(2, 2, 2));
+        assertFalse(l.mayContain("2:/xyz/123"));
+        assertFalse(l.mayContainChildrenOf("1:/xyz"));
+    }
+
+    @Test
+    public void testGotRootRevisions() {
+        LocalChanges l = new LocalChanges(2); // only consider the last timestamp in revisison
+        l.add("2:/xyz/123", 4);
+
+        l.gotRootRevisions(revsV(1, 1, 1));
+        assertTrue(l.mayContain("2:/xyz/123"));
+
+        l.gotRootRevisions(revsV(2, 2, 2));
+        assertTrue(l.mayContain("2:/xyz/123"));
+
+        l.gotRootRevisions(revsV(2, 3, 3));
+        assertTrue(l.mayContain("2:/xyz/123"));
+
+        l.gotRootRevisions(revsV(2, 3, 4));
+        assertFalse(l.mayContain("2:/xyz/123"));
+    }
+
+    @Test
+    public void testLimit() {
+        LocalChanges l = new LocalChanges(0);
+        l.gotRootRevisions(revsV(1)); // make the class active
+
+        for (int i = 1; i <= 99; i++) {
+            l.add("2:/xyz/" + i, i + 100);
+            assertTrue(l.mayContain("2:/xyz/" + i));
+            assertFalse(l.mayContain("2:/abc/" + i));
+        }
+        l.add("2:/xyz/100", 200); // the list should be cleared right now
+        l.add("2:/abc/123", 300); // this is added to the new list
+        l.add("2:/abc/456", 100); // this shouldn't be added to the new list (as it's old)
+
+        // now the list should be cleared and we should got true for all documents
+        assertTrue(l.mayContain("2:/abc/999"));
+        assertEquals(singleton("2:/abc/123"), l.localChanges.keySet());
+
+        l.gotRootRevisions(revsV(200)); // invalidate
+        assertFalse(l.mayContain("2:/xyz/0"));
+        assertFalse(l.mayContain("2:/xyz/99"));
+
+        assertTrue(l.mayContain("2:/abc/123"));
+        assertFalse(l.mayContain("2:/abc/456"));
+    }
+
+    @Test
+    public void dontAddOldRevisions() {
+        LocalChanges l = new LocalChanges(0);
+        l.gotRootRevisions(revsV(10));
+        l.add("2:/xyz/1", 5);
+        assertFalse(l.mayContain("2:/xyz/1"));
+    }
+
+    private Collection<Revision> revs(int... timestamps) {
+        List<Revision> revs = new ArrayList<Revision>();
+        for (int i = 0; i < timestamps.length; i++) {
+            revs.add(new Revision(timestamps[i], 0, i, false));
+        }
+        return revs;
+    }
+
+    private RevisionVector revsV(int... timestamps) {
+        return new RevisionVector(revs(timestamps));
+    }
+}