You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2018/05/23 15:56:21 UTC

svn commit: r1832110 [1/2] - in /jackrabbit/oak/trunk: ./ oak-store-document/ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ ...

Author: mreutegg
Date: Wed May 23 15:56:20 2018
New Revision: 1832110

URL: http://svn.apache.org/viewvc?rev=1832110&view=rev
Log:
OAK-6087: Avoid reads from MongoDB primary

Automatically use client sessions when running on MongoDB 3.6, otherwise fall back to the previous behaviour.
The feature can be disabled with a system property: -Doak.mongo.clientSession=false
Travis runs an additional build for oak-store-document on a MongoDB replica-set and a readPreference of secondaryPreferred

Added:
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoTestUtils.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatusTest.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ClusterDescriptionProvider.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoClusterListener.java
Modified:
    jackrabbit/oak/trunk/.travis.yml
    jackrabbit/oak/trunk/oak-store-document/pom.xml
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/MongoBlobStoreTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java

Modified: jackrabbit/oak/trunk/.travis.yml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/.travis.yml?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/.travis.yml (original)
+++ jackrabbit/oak/trunk/.travis.yml Wed May 23 15:56:20 2018
@@ -21,12 +21,13 @@ env:
     - MODULE=oak-jcr PROFILE="-PintegrationTesting" UT="-Dsurefire.skip.ut=true" MONGODB_MODE="--single"
     - MODULE=oak-jcr PROFILE="" UT="" MONGODB_MODE="--single"
     - MODULE=oak-store-document PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
-    - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+    - MODULE=oak-store-document PROFILE="-PintegrationTesting,replicaset" UT="" MONGODB_MODE="--replicaset"
     - MODULE=oak-lucene PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+    - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
     - MODULE=oak-run PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
-    - MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
-    - MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
     - MODULE=oak-upgrade PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+    - MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+    - MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
 install:
   - wget -N http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-${MONGODB}.tgz -P $HOME/.mongodb
   - tar --skip-old-files -C $HOME/.mongodb -xf $HOME/.mongodb/mongodb-linux-x86_64-${MONGODB}.tgz

Modified: jackrabbit/oak/trunk/oak-store-document/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/pom.xml?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-store-document/pom.xml Wed May 23 15:56:20 2018
@@ -63,6 +63,15 @@
     </plugins>
   </build>
 
+  <profiles>
+    <profile>
+      <id>replicaset</id>
+      <properties>
+        <mongo.url>mongodb://localhost:27017,localhost:27018,localhost:27019/MongoMKDB?readPreference=secondaryPreferred</mongo.url>
+      </properties>
+    </profile>
+  </profiles>
+
   <dependencies>
     <!-- Optional OSGi dependencies, used only when running within OSGi -->
     <dependency>

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java?rev=1832110&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java Wed May 23 15:56:20 2018
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerMonitorListener;
+
+/**
+ * A composite {@link ServerMonitorListener}.
+ */
+class CompositeServerMonitorListener implements ServerMonitorListener {
+
+    private final List<ServerMonitorListener> listeners = new CopyOnWriteArrayList<>();
+
+    void addListener(ServerMonitorListener listener) {
+        listeners.add(listener);
+    }
+
+    void removeListener(ServerMonitorListener listener) {
+        listeners.remove(listener);
+    }
+
+    @Override
+    public void serverHearbeatStarted(ServerHeartbeatStartedEvent event) {
+        listeners.forEach(l -> l.serverHearbeatStarted(event));
+    }
+
+    @Override
+    public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
+        listeners.forEach(l -> l.serverHeartbeatSucceeded(event));
+    }
+
+    @Override
+    public void serverHeartbeatFailed(ServerHeartbeatFailedEvent event) {
+        listeners.forEach(l -> l.serverHeartbeatFailed(event));
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java Wed May 23 15:56:20 2018
@@ -31,6 +31,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.bson.conversions.Bson;
 
 import com.mongodb.BasicDBObject;
+import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.Filters;
 
@@ -38,7 +39,8 @@ public class MongoBlobReferenceIterator
 
     private final MongoDocumentStore documentStore;
 
-    public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, MongoDocumentStore documentStore) {
+    public MongoBlobReferenceIterator(DocumentNodeStore nodeStore,
+                                      MongoDocumentStore documentStore) {
         super(nodeStore);
         this.documentStore = documentStore;
     }
@@ -46,9 +48,8 @@ public class MongoBlobReferenceIterator
     @Override
     public Iterator<NodeDocument> getIteratorOverDocsWithBinaries() {
         Bson query = Filters.eq(NodeDocument.HAS_BINARY_FLAG, NodeDocument.HAS_BINARY_VAL);
-        // TODO It currently prefers secondary. Would that be Ok?
+        // TODO It currently uses the configured read preference. Would that be Ok?
         MongoCursor<BasicDBObject> cursor = documentStore.getDBCollection(NODES)
-                .withReadPreference(documentStore.getConfiguredReadPreference(NODES))
                 .find(query).iterator();
 
         return CloseableIterator.wrap(transform(cursor,

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java Wed May 23 15:56:20 2018
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.mongodb.ReadPreference;
 import com.mongodb.client.model.UpdateOptions;
 import com.mongodb.client.result.UpdateResult;
 import org.apache.jackrabbit.oak.commons.StringUtils;
@@ -42,7 +43,6 @@ import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Filters;
 
 import static com.mongodb.ReadPreference.primary;
-import static com.mongodb.ReadPreference.secondaryPreferred;
 import static java.util.stream.StreamSupport.stream;
 import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
 import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
@@ -68,6 +68,7 @@ public class MongoBlobStore extends Cach
             fromCodecs(new MongoBlobCodec())
     );
 
+    private final ReadPreference defaultReadPreference;
     private final MongoCollection<MongoBlob> blobCollection;
     private long minLastModified;
 
@@ -86,6 +87,7 @@ public class MongoBlobStore extends Cach
         // space allocated for a record to the next power of two
         // (there is an overhead per record, let's assume it is 1 KB at most)
         setBlockSize(2 * 1024 * 1024 - 1024);
+        defaultReadPreference = db.getReadPreference();
         blobCollection = initBlobCollection(db);
     }
 
@@ -184,8 +186,12 @@ public class MongoBlobStore extends Cach
                 .noneMatch(COLLECTION_BLOBS::equals)) {
             db.createCollection(COLLECTION_BLOBS);
         }
+        // override the read preference configured with the MongoDB URI
+        // and use the primary as default. Reading a blob will still
+        // try a secondary first and then fallback to the primary.
         return db.getCollection(COLLECTION_BLOBS, MongoBlob.class)
-                .withCodecRegistry(CODEC_REGISTRY);
+                .withCodecRegistry(CODEC_REGISTRY)
+                .withReadPreference(primary());
     }
 
     private MongoCollection<MongoBlob> getBlobCollection() {
@@ -196,10 +202,9 @@ public class MongoBlobStore extends Cach
         Bson query = getBlobQuery(id, lastMod);
         Bson fields = new BasicDBObject(MongoBlob.KEY_DATA, 1);
 
-        // try the secondary first
-        // TODO add a configuration option for whether to try reading from secondary
+        // try with default read preference first, may be from secondary
         List<MongoBlob> result = new ArrayList<>(1);
-        getBlobCollection().withReadPreference(secondaryPreferred()).find(query)
+        getBlobCollection().withReadPreference(defaultReadPreference).find(query)
                 .projection(fields).into(result);
         if (result.isEmpty()) {
             // not found in the secondary: try the primary
@@ -244,7 +249,6 @@ public class MongoBlobStore extends Cach
     @Override
     public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
         Bson fields = new BasicDBObject(MongoBlob.KEY_ID, 1);
-        Bson hint = new BasicDBObject("$hint", fields);
 
         Bson query = new Document();
         if (maxLastModifiedTime != 0 && maxLastModifiedTime != -1) {
@@ -252,7 +256,7 @@ public class MongoBlobStore extends Cach
         }
 
         final MongoCursor<MongoBlob> cur = getBlobCollection().find(query)
-                .projection(fields).modifiers(hint).iterator();
+                .projection(fields).hint(fields).iterator();
 
         //TODO The cursor needs to be closed
         return new AbstractIterator<String>() {

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java Wed May 23 15:56:20 2018
@@ -16,7 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.mongo;
 
-import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
@@ -56,6 +55,7 @@ public abstract class MongoDocumentNodeS
     private boolean socketKeepAlive = true;
     private MongoStatus mongoStatus;
     private long maxReplicationLagMillis = TimeUnit.HOURS.toMillis(6);
+    private boolean clientSessionDisabled = false;
 
     /**
      * Uses the given information to connect to to MongoDB as backend
@@ -70,21 +70,19 @@ public abstract class MongoDocumentNodeS
      *             any database name given in the {@code uri}.
      * @param blobCacheSizeMB the blob cache size in MB.
      * @return this
-     * @throws UnknownHostException if one of the hosts given in the URI
-     *          is unknown.
      */
     public T setMongoDB(@Nonnull String uri,
                         @Nonnull String name,
-                        int blobCacheSizeMB)
-            throws UnknownHostException {
+                        int blobCacheSizeMB) {
         this.mongoUri = uri;
 
-        MongoClusterListener listener = new MongoClusterListener();
+        CompositeServerMonitorListener serverMonitorListener = new CompositeServerMonitorListener();
         MongoClientOptions.Builder options = MongoConnection.getDefaultBuilder();
-        options.addClusterListener(listener);
+        options.addServerMonitorListener(serverMonitorListener);
         options.socketKeepAlive(socketKeepAlive);
         MongoClient client = new MongoClient(new MongoClientURI(uri, options));
-        MongoStatus status = new MongoStatus(client, name, listener);
+        MongoStatus status = new MongoStatus(client, name);
+        serverMonitorListener.addListener(status);
         MongoDatabase db = client.getDatabase(name);
         if (!MongoConnection.hasWriteConcern(uri)) {
             db = db.withWriteConcern(MongoConnection.getDefaultWriteConcern(client));
@@ -167,6 +165,26 @@ public abstract class MongoDocumentNodeS
         return socketKeepAlive;
     }
 
+    /**
+     * Disables the use of a client session available with MongoDB 3.6 and
+     * newer. By default the MongoDocumentStore will use a client session if
+     * available. That is, when connected to MongoDB 3.6 and newer.
+     *
+     * @param b whether to disable the use of a client session.
+     * @return this
+     */
+    public T setClientSessionDisabled(boolean b) {
+        this.clientSessionDisabled = b;
+        return thisBuilder();
+    }
+
+    /**
+     * @return whether the use of a client session is disabled.
+     */
+    boolean isClientSessionDisabled() {
+        return clientSessionDisabled;
+    }
+
     public T setMaxReplicationLag(long duration, TimeUnit unit){
         maxReplicationLagMillis = unit.toMillis(duration);
         return thisBuilder();

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Wed May 23 15:56:20 2018
@@ -105,7 +105,9 @@ import com.mongodb.client.model.ReturnDo
 import com.mongodb.client.model.UpdateOneModel;
 import com.mongodb.client.model.UpdateOptions;
 import com.mongodb.client.model.WriteModel;
+import com.mongodb.client.result.DeleteResult;
 import com.mongodb.client.result.UpdateResult;
+import com.mongodb.session.ClientSession;
 
 import static com.google.common.base.Predicates.in;
 import static com.google.common.base.Predicates.not;
@@ -150,6 +152,8 @@ public class MongoDocumentStore implemen
     private final MongoCollection<BasicDBObject> journal;
 
     private final MongoClient client;
+    private final MongoStatus status;
+    private final MongoSessionFactory sessionFactory;
     private final MongoDatabase db;
 
     private final NodeDocumentCache nodesCache;
@@ -236,6 +240,19 @@ public class MongoDocumentStore implemen
     private final int queryRetries =
             Integer.getInteger("oak.mongo.queryRetries", 2);
 
+    /**
+     * Acceptable replication lag of secondaries in milliseconds. Reads are
+     * directed to the primary if the estimated replication lag is higher than
+     * this value.
+     */
+    private final int acceptableLagMillis =
+            Integer.getInteger("oak.mongo.acceptableLagMillis", 5000);
+
+    /**
+     * Feature flag for use of MongoDB client sessions.
+     */
+    private final boolean useClientSession;
+
     private String lastReadWriteMode;
 
     private final Map<String, String> metadata;
@@ -262,6 +279,8 @@ public class MongoDocumentStore implemen
                 .build();
 
         this.client = client;
+        this.status = mongoStatus;
+        this.sessionFactory = new MongoSessionFactory(client);
         this.db = client.getDatabase(dbName);
         stats = builder.getDocumentStoreStatsCollector();
         nodes = db.getCollection(Collection.NODES.toString(), BasicDBObject.class);
@@ -280,18 +299,22 @@ public class MongoDocumentStore implemen
             replicaInfoThread.setDaemon(true);
             replicaInfoThread.start();
         }
+        useClientSession = !builder.isClientSessionDisabled()
+                && Boolean.parseBoolean(System.getProperty("oak.mongo.clientSession", "true"));
 
-        // indexes:
-        // the _id field is the primary key, so we don't need to define it
+        // counting the number of documents in the nodes collection and
+        // checking existing indexes is performed against the MongoDB primary
+        // this ensure the information is up-to-date and accurate
+        long initialDocsCount = getNodesCount();
 
-        long initialDocsCount = nodes.count();
         // compound index on _modified and _id
         if (initialDocsCount == 0) {
             // this is an empty store, create a compound index
             // on _modified and _id (OAK-3071)
             createIndex(nodes, new String[]{NodeDocument.MODIFIED_IN_SECS, Document.ID},
                     new boolean[]{true, true}, false, false);
-        } else if (!hasIndex(nodes, NodeDocument.MODIFIED_IN_SECS, Document.ID)) {
+        } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()),
+                NodeDocument.MODIFIED_IN_SECS, Document.ID)) {
             hasModifiedIdCompoundIndex = false;
             if (!builder.getReadOnlyMode()) {
                 LOG.warn("Detected an upgrade from Oak version <= 1.2. For optimal " +
@@ -312,7 +335,8 @@ public class MongoDocumentStore implemen
             } else {
                 createIndex(nodes, NodeDocument.DELETED_ONCE, true, false, true);
             }
-        } else if (!hasIndex(nodes, DELETED_ONCE, MODIFIED_IN_SECS)) {
+        } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()),
+                DELETED_ONCE, MODIFIED_IN_SECS)) {
             if (!builder.getReadOnlyMode()) {
                 LOG.warn("Detected an upgrade from Oak version <= 1.6. For optimal " +
                         "Revision GC performance it is recommended to create a " +
@@ -329,7 +353,8 @@ public class MongoDocumentStore implemen
             // on _sdType and _sdMaxRevTime (OAK-6129)
             createIndex(nodes, new String[]{SD_TYPE, SD_MAX_REV_TIME_IN_SECS},
                     new boolean[]{true, true}, false, true);
-        } else if (!hasIndex(nodes, SD_TYPE, SD_MAX_REV_TIME_IN_SECS)) {
+        } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()),
+                SD_TYPE, SD_MAX_REV_TIME_IN_SECS)) {
             if (!builder.getReadOnlyMode()) {
                 LOG.warn("Detected an upgrade from Oak version <= 1.6. For optimal " +
                         "Revision GC performance it is recommended to create a " +
@@ -346,10 +371,11 @@ public class MongoDocumentStore implemen
 
         LOG.info("Connected to MongoDB {} with maxReplicationLagMillis {}, " +
                 "maxDeltaForModTimeIdxSecs {}, disableIndexHint {}, " +
-                "{}, serverStatus {}",
-                mongoStatus.getVersion(), maxReplicationLagMillis, maxDeltaForModTimeIdxSecs,
-                disableIndexHint, db.getWriteConcern(),
-                mongoStatus.getServerDetails());
+                "clientSessionSupported {}, clientSessionInUse {}, serverStatus {}",
+                mongoStatus.getVersion(), maxReplicationLagMillis,
+                maxDeltaForModTimeIdxSecs, disableIndexHint,
+                status.isClientSessionSupported(), useClientSession,
+                db.getWriteConcern(), mongoStatus.getServerDetails());
     }
 
     public boolean isReadOnly() {
@@ -446,8 +472,11 @@ public class MongoDocumentStore implemen
                                        boolean preferCached,
                                        final int maxCacheAge) {
         if (collection != Collection.NODES) {
-            return findUncachedWithRetry(collection, key,
-                    DocumentReadPreference.PRIMARY);
+            DocumentReadPreference readPref = DocumentReadPreference.PRIMARY;
+            if (withClientSession()) {
+                readPref = getDefaultReadPreference(collection);
+            }
+            return findUncachedWithRetry(collection, key, readPref);
         }
         NodeDocument doc;
         if (maxCacheAge > 0 || preferCached) {
@@ -551,12 +580,12 @@ public class MongoDocumentStore implemen
     @CheckForNull
     protected <T extends Document> T findUncached(Collection<T> collection, String key, DocumentReadPreference docReadPref) {
         log("findUncached", key, docReadPref);
-        MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection);
         final Stopwatch watch = startWatch();
         boolean isSlaveOk = false;
         boolean docFound = true;
         try {
             ReadPreference readPreference = getMongoReadPreference(collection, null, key, docReadPref);
+            MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection, readPreference);
 
             if(readPreference.isSlaveOk()){
                 LOG.trace("Routing call to secondary for fetching [{}]", key);
@@ -564,7 +593,14 @@ public class MongoDocumentStore implemen
             }
 
             List<BasicDBObject> result = new ArrayList<>(1);
-            dbCollection.withReadPreference(readPreference).find(getByKeyQuery(key)).into(result);
+            execute(session -> {
+                if (session != null) {
+                    dbCollection.find(session, getByKeyQuery(key)).into(result);
+                } else {
+                    dbCollection.find(getByKeyQuery(key)).into(result);
+                }
+                return null;
+            });
 
             if(result.isEmpty()) {
                 docFound = false;
@@ -644,13 +680,18 @@ public class MongoDocumentStore implemen
                                                          int limit,
                                                          long maxQueryTime) {
         log("query", fromKey, toKey, indexedProperty, startValue, limit);
-        MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection);
 
         List<Bson> clauses = new ArrayList<>();
         clauses.add(Filters.gt(Document.ID, fromKey));
         clauses.add(Filters.lt(Document.ID, toKey));
 
-        Bson hint = new BasicDBObject(NodeDocument.ID, 1);
+        Bson hint;
+        if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty)
+                && canUseModifiedTimeIdx(startValue)) {
+            hint = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1);
+        } else {
+            hint = new BasicDBObject(NodeDocument.ID, 1);
+        }
 
         if (indexedProperty != null) {
             if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) {
@@ -662,17 +703,12 @@ public class MongoDocumentStore implemen
                 clauses.add(Filters.eq(indexedProperty, true));
             } else {
                 clauses.add(Filters.gte(indexedProperty, startValue));
-
-                if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty)
-                        && canUseModifiedTimeIdx(startValue)) {
-                    hint = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1);
-                }
             }
         }
         Bson query = Filters.and(clauses);
         String parentId = Utils.getParentIdFromLowerLimit(fromKey);
         long lockTime = -1;
-        final Stopwatch watch  = startWatch();
+        final Stopwatch watch = startWatch();
 
         boolean isSlaveOk = false;
         int resultSize = 0;
@@ -688,29 +724,38 @@ public class MongoDocumentStore implemen
                 isSlaveOk = true;
                 LOG.trace("Routing call to secondary for fetching children from [{}] to [{}]", fromKey, toKey);
             }
-            FindIterable<BasicDBObject> result = dbCollection
-                    .withReadPreference(readPreference).find(query).sort(BY_ID_ASC);
-            if (limit >= 0) {
-                result.limit(limit);
-            }
-            if (!disableIndexHint && !hasModifiedIdCompoundIndex) {
-                result.modifiers(new BasicDBObject("$hint", hint));
-            }
-            if (maxQueryTime > 0) {
-                // OAK-2614: set maxTime if maxQueryTimeMS > 0
-                result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
-            }
-
-            List<T> list;
-            try (MongoCursor<BasicDBObject> cursor = result.iterator()) {
-                list = new ArrayList<T>();
-                for (int i = 0; i < limit && cursor.hasNext(); i++) {
-                    BasicDBObject o = cursor.next();
-                    T doc = convertFromDBObject(collection, o);
-                    list.add(doc);
+
+            List<T> list = new ArrayList<T>();
+            MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection, readPreference);
+            execute(session -> {
+                FindIterable<BasicDBObject> result;
+                if (session != null) {
+                    result = dbCollection.find(session, query);
+                } else {
+                    result = dbCollection.find(query);
                 }
-                resultSize = list.size();
-            }
+                result.sort(BY_ID_ASC);
+                if (limit >= 0) {
+                    result.limit(limit);
+                }
+                if (!disableIndexHint && !hasModifiedIdCompoundIndex) {
+                    result.hint(hint);
+                }
+                if (maxQueryTime > 0) {
+                    // OAK-2614: set maxTime if maxQueryTimeMS > 0
+                    result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
+                }
+
+                try (MongoCursor<BasicDBObject> cursor = result.iterator()) {
+                    for (int i = 0; i < limit && cursor.hasNext(); i++) {
+                        BasicDBObject o = cursor.next();
+                        T doc = convertFromDBObject(collection, o);
+                        list.add(doc);
+                    }
+                }
+                return null;
+            });
+            resultSize = list.size();
 
             if (cacheChangesTracker != null) {
                 nodesCache.putNonConflictingDocs(cacheChangesTracker, (List<NodeDocument>) list);
@@ -739,7 +784,15 @@ public class MongoDocumentStore implemen
         MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection);
         Stopwatch watch = startWatch();
         try {
-            dbCollection.deleteOne(getByKeyQuery(key));
+            execute(session -> {
+                Bson filter = getByKeyQuery(key);
+                if (session != null) {
+                    dbCollection.deleteOne(session, filter);
+                } else {
+                    dbCollection.deleteOne(filter);
+                }
+                return null;
+            });
         } catch (Exception e) {
             throw DocumentStoreException.convert(e, "Remove failed for " + key);
         } finally {
@@ -757,7 +810,14 @@ public class MongoDocumentStore implemen
             for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){
                 Bson query = Filters.in(Document.ID, keyBatch);
                 try {
-                    dbCollection.deleteMany(query);
+                    execute(session -> {
+                        if (session != null) {
+                            dbCollection.deleteMany(session, query);
+                        } else {
+                            dbCollection.deleteMany(query);
+                        }
+                        return null;
+                    });
                 } catch (Exception e) {
                     throw DocumentStoreException.convert(e, "Remove failed for " + keyBatch);
                 } finally {
@@ -793,7 +853,15 @@ public class MongoDocumentStore implemen
                 if (!it.hasNext() || batch.size() == IN_CLAUSE_BATCH_SIZE) {
                     Bson query = Filters.or(batch);
                     try {
-                        num += dbCollection.deleteMany(query).getDeletedCount();
+                        num += execute(session -> {
+                            DeleteResult result;
+                            if (session != null) {
+                                result = dbCollection.deleteMany(session, query);
+                            } else {
+                                result = dbCollection.deleteMany(query);
+                            }
+                            return result.getDeletedCount();
+                        });
                     } catch (Exception e) {
                         throw DocumentStoreException.convert(e, "Remove failed for " + batch);
                     } finally {
@@ -825,7 +893,15 @@ public class MongoDocumentStore implemen
                     Filters.lt(indexedProperty, endValue)
             );
             try {
-                num = (int) Math.min(dbCollection.deleteMany(query).getDeletedCount(), Integer.MAX_VALUE);
+                num = (int) Math.min(execute((DocumentStoreCallable<Long>) session -> {
+                    DeleteResult result;
+                    if (session != null) {
+                        result = dbCollection.deleteMany(session, query);
+                    } else {
+                        result = dbCollection.deleteMany(query);
+                    }
+                    return result.getDeletedCount();
+                }), Integer.MAX_VALUE);
             } catch (Exception e) {
                 throw DocumentStoreException.convert(e, "Remove failed for " + collection + ": " +
                     indexedProperty + " in (" + startValue + ", " + endValue + ")");
@@ -880,16 +956,23 @@ public class MongoDocumentStore implemen
                 // no conditions and the check is OK. this avoid an
                 // unnecessary call when the conditions do not match
                 if (!checkConditions || UpdateUtils.checkConditions(cachedDoc, updateOp.getConditions())) {
-                    Bson query = createQueryForUpdate(updateOp.getId(),
-                            updateOp.getConditions());
                     // below condition may overwrite a user supplied condition
                     // on _modCount. This is fine, because the conditions were
                     // already checked against the cached document with the
                     // matching _modCount value. There is no need to check the
                     // user supplied condition on _modCount again on the server
-                    query = Filters.and(query, Filters.eq(Document.MOD_COUNT, modCount));
-
-                    UpdateResult result = dbCollection.updateOne(query, update);
+                    Bson query = Filters.and(
+                            createQueryForUpdate(updateOp.getId(), updateOp.getConditions()),
+                            Filters.eq(Document.MOD_COUNT, modCount)
+                    );
+
+                    UpdateResult result = execute(session -> {
+                        if (session != null) {
+                            return dbCollection.updateOne(session, query, update);
+                        } else {
+                            return dbCollection.updateOne(query, update);
+                        }
+                    });
                     if (result.getModifiedCount() > 0) {
                         // success, update cached document
                         if (collection == Collection.NODES) {
@@ -907,7 +990,13 @@ public class MongoDocumentStore implemen
             Bson query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions());
             FindOneAndUpdateOptions options = new FindOneAndUpdateOptions()
                     .returnDocument(ReturnDocument.BEFORE).upsert(upsert);
-            BasicDBObject oldNode = dbCollection.findOneAndUpdate(query, update, options);
+            BasicDBObject oldNode = execute(session -> {
+                if (session != null) {
+                    return dbCollection.findOneAndUpdate(session, query, update, options);
+                } else {
+                    return dbCollection.findOneAndUpdate(query, update, options);
+                }
+            });
 
             if (oldNode == null){
                 newEntry = true;
@@ -1143,13 +1232,26 @@ public class MongoDocumentStore implemen
             for (String key : keys) {
                 conditions.add(getByKeyQuery(key));
             }
-
-            FindIterable<BasicDBObject> cursor = getDBCollection(collection)
-                    .find(Filters.or(conditions));
-            for (BasicDBObject doc : cursor) {
-                T foundDoc = convertFromDBObject(collection, doc);
-                docs.put(foundDoc.getId(), foundDoc);
+            MongoCollection<BasicDBObject> dbCollection;
+            if (secondariesWithinAcceptableLag()) {
+                dbCollection = getDBCollection(collection);
+            } else {
+                lagTooHigh();
+                dbCollection = getDBCollection(collection).withReadPreference(ReadPreference.primary());
             }
+            execute(session -> {
+                FindIterable<BasicDBObject> cursor;
+                if (session != null) {
+                    cursor = dbCollection.find(session, Filters.or(conditions));
+                } else {
+                    cursor = dbCollection.find(Filters.or(conditions));
+                }
+                for (BasicDBObject doc : cursor) {
+                    T foundDoc = convertFromDBObject(collection, doc);
+                    docs.put(foundDoc.getId(), foundDoc);
+                }
+                return null;
+            });
         }
         return docs;
     }
@@ -1177,9 +1279,15 @@ public class MongoDocumentStore implemen
         BulkWriteResult bulkResult;
         Set<String> failedUpdates = new HashSet<String>();
         Set<String> upserts = new HashSet<String>();
+        BulkWriteOptions options = new BulkWriteOptions().ordered(false);
         try {
-            bulkResult = dbCollection.bulkWrite(writes,
-                    new BulkWriteOptions().ordered(false));
+            bulkResult = execute(session -> {
+                if (session != null) {
+                    return dbCollection.bulkWrite(session, writes, options);
+                } else {
+                    return dbCollection.bulkWrite(writes, options);
+                }
+            });
         } catch (MongoBulkWriteException e) {
             bulkResult = e.getWriteResult();
             for (BulkWriteError err : e.getWriteErrors()) {
@@ -1257,7 +1365,14 @@ public class MongoDocumentStore implemen
         boolean insertSuccess = false;
         try {
             try {
-                dbCollection.insertMany(inserts);
+                execute(session -> {
+                    if (session != null) {
+                        dbCollection.insertMany(session, inserts);
+                    } else {
+                        dbCollection.insertMany(inserts);
+                    }
+                    return null;
+                });
                 if (collection == Collection.NODES) {
                     for (T doc : docs) {
                         nodesCache.putIfAbsent((NodeDocument) doc);
@@ -1314,7 +1429,9 @@ public class MongoDocumentStore implemen
 
     DocumentReadPreference getReadPreference(int maxCacheAge){
         long lag = fallbackSecondaryStrategy ? maxReplicationLagMillis : replicaInfo.getLag();
-        if(maxCacheAge >= 0 && maxCacheAge < lag) {
+        if (withClientSession()) {
+            return DocumentReadPreference.PREFER_SECONDARY;
+        } else if(maxCacheAge >= 0 && maxCacheAge < lag) {
             return DocumentReadPreference.PRIMARY;
         } else if(maxCacheAge == Integer.MAX_VALUE){
             return DocumentReadPreference.PREFER_SECONDARY;
@@ -1323,8 +1440,14 @@ public class MongoDocumentStore implemen
         }
     }
 
-    DocumentReadPreference getDefaultReadPreference(Collection col){
-        return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : DocumentReadPreference.PRIMARY;
+    DocumentReadPreference getDefaultReadPreference(Collection col) {
+        DocumentReadPreference preference = DocumentReadPreference.PRIMARY;
+        if (withClientSession()) {
+            preference = DocumentReadPreference.PREFER_SECONDARY;
+        } else if (col == Collection.NODES) {
+            preference = DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH;
+        }
+        return preference;
     }
 
     <T extends Document> ReadPreference getMongoReadPreference(@Nonnull Collection<T> collection,
@@ -1337,14 +1460,21 @@ public class MongoDocumentStore implemen
             case PREFER_PRIMARY :
                 return ReadPreference.primaryPreferred();
             case PREFER_SECONDARY :
-                return getConfiguredReadPreference(collection);
+                if (!withClientSession() || secondariesWithinAcceptableLag()) {
+                    return getConfiguredReadPreference(collection);
+                } else {
+                    lagTooHigh();
+                    return ReadPreference.primary();
+                }
             case PREFER_SECONDARY_IF_OLD_ENOUGH:
                 if(collection != Collection.NODES){
                     return ReadPreference.primary();
                 }
 
                 boolean secondarySafe;
-                if (fallbackSecondaryStrategy) {
+                if (withClientSession() && secondariesWithinAcceptableLag()) {
+                    secondarySafe = true;
+                } else if (fallbackSecondaryStrategy) {
                    // This is not quite accurate, because ancestors
                     // are updated in a background thread (_lastRev). We
                     // will need to revise this for low maxReplicationLagMillis
@@ -1444,6 +1574,11 @@ public class MongoDocumentStore implemen
         }
     }
 
+    <T extends Document> MongoCollection<BasicDBObject> getDBCollection(Collection<T> collection,
+                                                                        ReadPreference readPreference) {
+        return getDBCollection(collection).withReadPreference(readPreference);
+    }
+
     MongoDatabase getDatabase() {
         return db;
     }
@@ -1790,6 +1925,68 @@ public class MongoDocumentStore implemen
         });
     }
 
+    /**
+     * Returns the number of documents in the {@link #nodes} collection. The read
+     * always happens on the MongoDB primary.
+     *
+     * @return the number of documents in the {@link #nodes} collection.
+     */
+    private long getNodesCount() {
+        return execute(session -> {
+            MongoCollection<?> c = nodes.withReadPreference(ReadPreference.primary());
+            long count;
+            if (session != null) {
+                count = c.count(session);
+            } else {
+                count = c.count();
+            }
+            return count;
+        });
+    }
+
+    private boolean withClientSession() {
+        return status.isClientSessionSupported() && useClientSession;
+    }
+
+    private boolean secondariesWithinAcceptableLag() {
+        return client.getReplicaSetStatus() == null
+                || status.getReplicaSetLagEstimate() < acceptableLagMillis;
+    }
+
+    private void lagTooHigh() {
+        LOG.debug("Read from secondary is preferred but replication lag is too high. Directing read to primary.");
+    }
+
+    /**
+     * Execute a callable with an optional {@link ClientSession}. A client
+     * session is passed to {@link DocumentStoreCallable#call(ClientSession)} if
+     * the connected MongoDB servers support client sessions, otherwise the
+     * session is {@code null}. The client session must only be used within
+     * the scope of the {@link DocumentStoreCallable#call(ClientSession)}.
+     * 
+     * @param callable the callable.
+     * @param <T> the return type of the callable.
+     * @return the result of the callable.
+     * @throws DocumentStoreException if the callable throws an exception.
+     */
+    private <T> T execute(DocumentStoreCallable<T> callable)
+            throws DocumentStoreException {
+        T result;
+        if (withClientSession()) {
+            try (ClientSession session = sessionFactory.createClientSession()) {
+                result = callable.call(session);
+            }
+        } else {
+            result = callable.call(null);
+        }
+        return result;
+    }
+
+    interface DocumentStoreCallable<T> {
+
+        T call(@Nullable ClientSession session) throws DocumentStoreException;
+    }
+
     private static class BulkUpdateResult {
 
         private final Set<String> failedUpdates;

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java Wed May 23 15:56:20 2018
@@ -59,7 +59,6 @@ public class MongoMissingLastRevSeeker e
         Bson sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1);
 
         FindIterable<BasicDBObject> cursor = getNodeCollection()
-                .withReadPreference(ReadPreference.primary())
                 .find(query).sort(sortFields);
         return CloseableIterable.wrap(transform(cursor,
                 input -> store.convertFromDBObject(NODES, input)));
@@ -76,7 +75,7 @@ public class MongoMissingLastRevSeeker e
     }
 
     private MongoCollection<BasicDBObject> getNodeCollection() {
-        return store.getDBCollection(NODES);
+        return store.getDBCollection(NODES, ReadPreference.primary());
     }
 
     private MongoCollection<BasicDBObject> getClusterNodeCollection() {

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java?rev=1832110&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java Wed May 23 15:56:20 2018
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.mongodb.ClientSessionOptions;
+import com.mongodb.MongoClient;
+import com.mongodb.session.ClientSession;
+import com.mongodb.session.ServerSession;
+
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+
+/**
+ * Factory for {@link ClientSession}s.
+ */
+class MongoSessionFactory {
+
+    private final MongoClient client;
+
+    private final ClientSessionOptions options;
+
+    private BsonDocument clusterTime;
+
+    private BsonTimestamp operationTime;
+
+    MongoSessionFactory(MongoClient client) {
+        this.client = client;
+        this.options = ClientSessionOptions.builder()
+                .causallyConsistent(true).build();
+    }
+
+    ClientSession createClientSession() {
+        ClientSession s = client.startSession(options);
+        synchronized (this) {
+            s.advanceClusterTime(clusterTime);
+            s.advanceOperationTime(operationTime);
+        }
+        return new TrackingClientSession(s);
+    }
+
+    private class TrackingClientSession implements ClientSession {
+
+        private final ClientSession session;
+
+        TrackingClientSession(ClientSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public ClientSessionOptions getOptions() {
+            return session.getOptions();
+        }
+
+        @Override
+        public boolean isCausallyConsistent() {
+            return session.isCausallyConsistent();
+        }
+
+        @Override
+        public Object getOriginator() {
+            return session.getOriginator();
+        }
+
+        @Override
+        public ServerSession getServerSession() {
+            return session.getServerSession();
+        }
+
+        @Override
+        public BsonTimestamp getOperationTime() {
+            return session.getOperationTime();
+        }
+
+        @Override
+        public void advanceOperationTime(BsonTimestamp operationTime) {
+            session.advanceOperationTime(operationTime);
+        }
+
+        @Override
+        public void advanceClusterTime(BsonDocument clusterTime) {
+            session.advanceClusterTime(clusterTime);
+        }
+
+        @Override
+        public BsonDocument getClusterTime() {
+            return session.getClusterTime();
+        }
+
+        @Override
+        public void close() {
+            synchronized (this) {
+                session.advanceClusterTime(clusterTime);
+                clusterTime = session.getClusterTime();
+                session.advanceOperationTime(operationTime);
+                operationTime = session.getOperationTime();
+            }
+            session.close();
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java Wed May 23 15:56:20 2018
@@ -19,21 +19,30 @@ package org.apache.jackrabbit.oak.plugin
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.mongodb.BasicDBObject;
+import com.mongodb.ClientSessionOptions;
 import com.mongodb.MongoClient;
+import com.mongodb.MongoClientException;
 import com.mongodb.MongoQueryException;
 import com.mongodb.ReadConcern;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
+import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerMonitorListener;
+import com.mongodb.session.ClientSession;
 
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class MongoStatus {
+public class MongoStatus implements ServerMonitorListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(MongoStatus.class);
 
@@ -46,8 +55,6 @@ public class MongoStatus {
 
     private final String dbName;
 
-    private final ClusterDescriptionProvider descriptionProvider;
-
     private BasicDBObject serverStatus;
 
     private BasicDBObject buildInfo;
@@ -58,17 +65,14 @@ public class MongoStatus {
 
     private Boolean majorityReadConcernEnabled;
 
-    public MongoStatus(@Nonnull MongoClient client,
-                       @Nonnull String dbName) {
-        this(client, dbName, () -> null);
-    }
+    private Boolean clientSessionSupported;
+
+    private final ReplicaSetStatus replicaSetStatus = new ReplicaSetStatus();
 
     public MongoStatus(@Nonnull MongoClient client,
-                       @Nonnull String dbName,
-                       @Nonnull ClusterDescriptionProvider descriptionProvider) {
+                       @Nonnull String dbName) {
         this.client = client;
         this.dbName = dbName;
-        this.descriptionProvider = descriptionProvider;
     }
 
     public void checkVersion() {
@@ -172,6 +176,66 @@ public class MongoStatus {
         }
     }
 
+    /**
+     * @return {@code true} if client sessions are supported.
+     */
+    boolean isClientSessionSupported() {
+        if (clientSessionSupported == null) {
+            // must be at least 3.6
+            if (isVersion(3, 6)) {
+                ClientSessionOptions options = ClientSessionOptions.builder()
+                        .causallyConsistent(true).build();
+                try (ClientSession ignored = client.startSession(options)) {
+                    clientSessionSupported = true;
+                } catch (MongoClientException e) {
+                    clientSessionSupported = false;
+                }
+            } else {
+                clientSessionSupported = false;
+            }
+        }
+        return clientSessionSupported;
+    }
+
+    /**
+     * Returns an estimate of the replica-set lag in milliseconds. The returned
+     * value is not an accurate measurement of the replication lag and should
+     * only be used as a rough estimate to decide whether secondaries can be
+     * used for queries in general. 
+     * <p>
+     * This method may return {@link ReplicaSetStatus#UNKNOWN_LAG} if the value
+     * is currently unknown.
+     *
+     * @return an estimate of the
+     */
+    long getReplicaSetLagEstimate() {
+        return replicaSetStatus.getLagEstimate();
+    }
+
+    //------------------------< ServerMonitorListener >-------------------------
+
+    @Override
+    public void serverHearbeatStarted(ServerHeartbeatStartedEvent event) {
+        LOG.debug("serverHeartbeatStarted {}", event.getConnectionId());
+        replicaSetStatus.serverHearbeatStarted(event);
+    }
+
+    @Override
+    public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
+        LOG.debug("serverHeartbeatSucceeded {}, {}", event.getConnectionId(), event.getReply());
+        replicaSetStatus.serverHeartbeatSucceeded(event);
+    }
+
+    @Override
+    public void serverHeartbeatFailed(ServerHeartbeatFailedEvent event) {
+        LOG.debug("serverHeartbeatFailed {} ({} ms)", event.getConnectionId(),
+                event.getElapsedTime(TimeUnit.MILLISECONDS),
+                event.getThrowable());
+        replicaSetStatus.serverHeartbeatFailed(event);
+    }
+
+    //-------------------------------< internal >-------------------------------
+
     private BasicDBObject getServerStatus() {
         if (serverStatus == null) {
             serverStatus = client.getDatabase(dbName).runCommand(

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java Wed May 23 15:56:20 2018
@@ -34,7 +34,6 @@ import com.google.common.base.StandardSy
 import com.google.common.collect.Lists;
 import com.mongodb.BasicDBObject;
 import com.mongodb.Block;
-import com.mongodb.ReadPreference;
 import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.model.Filters;
@@ -100,7 +99,6 @@ public class MongoVersionGCSupport exten
                 Filters.lt(MODIFIED_IN_SECS, getModifiedInSecs(toModified))
         );
         FindIterable<BasicDBObject> cursor = getNodeCollection()
-                .withReadPreference(ReadPreference.secondaryPreferred())
                 .find(query).batchSize(batchSize);
 
         return CloseableIterable.wrap(transform(cursor,
@@ -110,9 +108,7 @@ public class MongoVersionGCSupport exten
     @Override
     public long getDeletedOnceCount() {
         Bson query = Filters.eq(DELETED_ONCE, Boolean.TRUE);
-        return getNodeCollection()
-                .withReadPreference(ReadPreference.secondaryPreferred())
-                .count(query);
+        return getNodeCollection().count(query);
     }
 
     @Override

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java?rev=1832110&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java Wed May 23 15:56:20 2018
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.CheckForNull;
+
+import com.mongodb.ServerAddress;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerMonitorListenerAdapter;
+
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Keeps track of the status of a replica set based on information provided
+ * by heartbeat events. This status provides a replica set lag estimate, which
+ * can be used to decide whether secondaries are sufficiently up-to-date and
+ * read operations can be sent to a secondary. This is particularly useful when
+ * causal consistent client sessions are used with the MongoDB Java driver. Read
+ * operations shouldn't be sent to a secondary when it lags too much behind,
+ * otherwise the read operation will block until it was able to catch up.
+ */
+public class ReplicaSetStatus extends ServerMonitorListenerAdapter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetStatus.class);
+
+    public static final long UNKNOWN_LAG = Long.MAX_VALUE;
+
+    /**
+     * Most recent heartbeats from connected servers
+     */
+    private final Map<ServerAddress, Heartbeat> heartbeats = new HashMap<>();
+
+    private final Set<ServerAddress> members = new HashSet<>();
+
+    private final Deque<Long> estimatesPerMember = new LinkedList<>();
+
+    private long lagEstimate = UNKNOWN_LAG;
+
+    @Override
+    public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
+        synchronized (heartbeats) {
+            ServerAddress address = event.getConnectionId().getServerId().getAddress();
+            Heartbeat beat = new Heartbeat(event);
+            heartbeats.put(address, beat);
+            members.addAll(beat.getHosts());
+            if (!members.isEmpty()) {
+                updateLag();
+            }
+        }
+    }
+
+    public long getLagEstimate() {
+        return lagEstimate;
+    }
+
+    private void updateLag() {
+        if (!heartbeats.keySet().containsAll(members)) {
+            lagEstimate = UNKNOWN_LAG;
+            return;
+        }
+
+        long oldestUpdate = Long.MAX_VALUE;
+        long newestUpdate = Long.MIN_VALUE;
+        long oldestWrite = Long.MAX_VALUE;
+        long newestWrite = Long.MIN_VALUE;
+        for (Map.Entry<ServerAddress, Heartbeat> entry : heartbeats.entrySet()) {
+            if (!members.contains(entry.getKey())) {
+                continue;
+            }
+            Heartbeat beat = entry.getValue();
+            Date lastWrite = beat.getLastWrite();
+            if (lastWrite == null) {
+                oldestWrite = 0;
+                newestWrite = Long.MAX_VALUE;
+            } else {
+                oldestWrite = Math.min(oldestWrite, lastWrite.getTime());
+                newestWrite = Math.max(newestWrite, lastWrite.getTime());
+            }
+            long updateTime = beat.getTime();
+            oldestUpdate = Math.min(oldestUpdate, updateTime);
+            newestUpdate = Math.max(newestUpdate, updateTime);
+        }
+        // heartbeats happen concurrently for all servers. It may happen we
+        // have some fresh and some stale heartbeats with update times up to
+        // heartbeatFreqMillis apart
+        long uncertaintyMillis = newestUpdate - oldestUpdate;
+        estimatesPerMember.addFirst(Math.max(0, newestWrite - oldestWrite - uncertaintyMillis));
+
+        // average estimates over up to number of members and remove old value
+        long estimate = 0;
+        int i = 0;
+        for (Iterator<Long> it = estimatesPerMember.iterator(); it.hasNext(); ) {
+            long v = it.next();
+            if (i++ < members.size()) {
+                estimate += v;
+            } else {
+                it.remove();
+            }
+        }
+        lagEstimate = estimate / members.size();
+        LOG.debug("lagEstimate: {} ms ({})", lagEstimate, estimatesPerMember);
+    }
+
+    private static class Heartbeat {
+
+        private final List<ServerAddress> hosts;
+
+        private final Date lastWrite;
+
+        private final long localTime;
+
+        Heartbeat(ServerHeartbeatSucceededEvent event) {
+            this.hosts = hostsFrom(event);
+            this.lastWrite = lastWriteFrom(event);
+            this.localTime = localTimeFrom(event).getTime();
+        }
+
+        Collection<ServerAddress> getHosts() {
+            return hosts;
+        }
+
+        long getTime() {
+            return localTime;
+        }
+
+        @CheckForNull
+        Date getLastWrite() {
+            return lastWrite;
+        }
+
+    }
+
+    private static List<ServerAddress> hostsFrom(ServerHeartbeatSucceededEvent event) {
+        return event.getReply().getArray("hosts", new BsonArray()).stream()
+                .map(bsonValue -> new ServerAddress(bsonValue.asString().getValue()))
+                .collect(Collectors.toList());
+    }
+
+    private static Date localTimeFrom(ServerHeartbeatSucceededEvent event) {
+        BsonDocument reply = event.getReply();
+        return new Date(reply.getDateTime("localTime").getValue());
+    }
+
+    private static Date lastWriteFrom(ServerHeartbeatSucceededEvent event) {
+        BsonDocument reply = event.getReply();
+        if (!reply.containsKey("lastWrite")) {
+            return null;
+        }
+        return new Date(reply.getDocument("lastWrite")
+                .getDateTime("lastWriteDate").getValue());
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java Wed May 23 15:56:20 2018
@@ -42,6 +42,7 @@ import static com.google.common.base.Pre
 public class MongoConnection {
 
     private static final int DEFAULT_MAX_WAIT_TIME = (int) TimeUnit.MINUTES.toMillis(1);
+    private static final int DEFAULT_HEARTBEAT_FREQUENCY_MS = (int) TimeUnit.SECONDS.toMillis(5);
     private static final WriteConcern WC_UNKNOWN = new WriteConcern("unknown");
     private static final Set<ReadConcernLevel> REPLICA_RC = ImmutableSet.of(ReadConcernLevel.MAJORITY, ReadConcernLevel.LINEARIZABLE);
     private final MongoClientURI mongoURI;
@@ -154,6 +155,7 @@ public class MongoConnection {
         return new MongoClientOptions.Builder()
                 .description("MongoConnection for Oak DocumentMK")
                 .maxWaitTime(DEFAULT_MAX_WAIT_TIME)
+                .heartbeatFrequency(DEFAULT_HEARTBEAT_FREQUENCY_MS)
                 .threadsAllowedToBlockForConnectionMultiplier(100);
     }
 
@@ -164,6 +166,7 @@ public class MongoConnection {
                 .add("socketTimeout", opts.getSocketTimeout())
                 .add("socketKeepAlive", opts.isSocketKeepAlive())
                 .add("maxWaitTime", opts.getMaxWaitTime())
+                .add("heartbeatFrequency", opts.getHeartbeatFrequency())
                 .add("threadsAllowedToBlockForConnectionMultiplier",
                         opts.getThreadsAllowedToBlockForConnectionMultiplier())
                 .add("readPreference", opts.getReadPreference().getName())

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java Wed May 23 15:56:20 2018
@@ -17,6 +17,7 @@
 package com.mongodb;
 
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -32,6 +33,10 @@ import com.mongodb.client.model.UpdateOp
 import com.mongodb.client.model.WriteModel;
 import com.mongodb.client.result.DeleteResult;
 import com.mongodb.client.result.UpdateResult;
+import com.mongodb.connection.Cluster;
+import com.mongodb.connection.ClusterConnectionMode;
+import com.mongodb.connection.ClusterDescription;
+import com.mongodb.connection.ClusterType;
 import com.mongodb.connection.ServerVersion;
 import com.mongodb.session.ClientSession;
 
@@ -42,6 +47,7 @@ import org.bson.conversions.Bson;
 
 import static java.util.stream.Collectors.toList;
 import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -99,6 +105,22 @@ public class OakFongo extends Fongo {
         for (String dbName : new String[]{MongoUtils.DB, "oak"}) {
             when(c.getDatabase(dbName)).thenReturn(new OakFongoMongoDatabase(dbName, this));
         }
+        try {
+            Field credentialsList = Mongo.class.getDeclaredField("credentialsList");
+            credentialsList.setAccessible(true);
+            credentialsList.set(c, Collections.emptyList());
+
+            ClusterDescription cd = new ClusterDescription(ClusterConnectionMode.SINGLE,
+                    ClusterType.STANDALONE, Collections.emptyList());
+            Cluster cl = mock(Cluster.class);
+            when(cl.getDescription()).thenReturn(cd);
+
+            Field cluster = Mongo.class.getDeclaredField("cluster");
+            cluster.setAccessible(true);
+            cluster.set(c, cl);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
         return c;
     }
 

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java Wed May 23 15:56:20 2018
@@ -18,6 +18,8 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.Collection;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
 
 public abstract class AbstractMultiDocumentStoreTest extends AbstractDocumentStoreTest {
@@ -30,6 +32,20 @@ public abstract class AbstractMultiDocum
         this.ds2 = dsf.createDocumentStore(2);
     }
 
+    @BeforeClass
+    public static void disableClientSession() {
+        // Disable the use of client session for this kind of tests.
+        // Most of these tests assume causal consistency across multiple
+        // DocumentStore instances, which is not the case when the test
+        // runs on a replica set and a client session is used.
+        System.setProperty("oak.mongo.clientSession", "false");
+    }
+
+    @AfterClass
+    public static void resetSystemProperty() {
+        System.clearProperty("oak.mongo.clientSession");
+    }
+
     @Override
     public void cleanUp() throws Exception {
         super.cleanUp();

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java Wed May 23 15:56:20 2018
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import com.mongodb.ReadPreference;
 
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.After;
 import org.junit.Before;
@@ -127,6 +129,10 @@ public class AbstractTwoNodeTest {
     }
 
     private static DocumentStore wrap(DocumentStore ds) {
+        // Enforce primary read preference because this test assumes causal
+        // consistent reads across multiple document stores. Otherwise this
+        // test fails on a replica set with secondary read preference
+        MongoTestUtils.setReadPreference(ds, ReadPreference.primary());
         return new DocumentStoreTestWrapper(ds);
     }
 

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java Wed May 23 15:56:20 2018
@@ -25,8 +25,11 @@ import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.mongodb.ReadPreference;
+
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -50,7 +53,7 @@ public class BlobReferenceIteratorTest {
     }
 
     @Parameterized.Parameters(name="{0}")
-    public static java.util.Collection<Object[]> fixtures() throws IOException {
+    public static java.util.Collection<Object[]> fixtures() {
         List<Object[]> fixtures = Lists.newArrayList();
         fixtures.add(new Object[] { new DocumentStoreFixture.MemoryFixture() });
 
@@ -67,11 +70,14 @@ public class BlobReferenceIteratorTest {
     }
 
     @Before
-    public void setUp() throws InterruptedException {
+    public void setUp() {
         store = new DocumentMK.Builder()
                 .setDocumentStore(fixture.createDocumentStore())
                 .setAsyncDelay(0)
                 .getNodeStore();
+        // enforce primary read preference, otherwise test fails on a replica
+        // set with a read preference configured to secondary.
+        MongoTestUtils.setReadPreference(store, ReadPreference.primary());
     }
 
     @After
@@ -95,6 +101,6 @@ public class BlobReferenceIteratorTest {
 
         List<ReferencedBlob> collectedBlobs = ImmutableList.copyOf(store.getReferencedBlobsIterator());
         assertEquals(blobs.size(), collectedBlobs.size());
-        assertEquals(new HashSet<ReferencedBlob>(blobs), new HashSet<ReferencedBlob>(collectedBlobs));
+        assertEquals(new HashSet<>(blobs), new HashSet<>(collectedBlobs));
     }
 }

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java Wed May 23 15:56:20 2018
@@ -39,6 +39,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeFalse;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -85,7 +86,7 @@ public class DocumentStoreStatsIT extend
 
         ds.invalidateCache();
         ds.find(Collection.NODES, id);
-        verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(true), eq(false));
+        verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(true), anyBoolean());
     }
 
     @Test
@@ -93,7 +94,7 @@ public class DocumentStoreStatsIT extend
         String id = testName.getMethodName();
 
         ds.find(Collection.NODES, id);
-        verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(false), eq(false));
+        verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(false), anyBoolean());
     }
 
     @Test
@@ -113,7 +114,7 @@ public class DocumentStoreStatsIT extend
                 eq(false),  //indexedProperty
                 eq(5) , // resultSize
                 anyLong(),   //lockTime
-                eq(false) //isSlaveOk
+                anyBoolean() //isSlaveOk
         );
     }
 

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Wed May 23 15:56:20 2018
@@ -43,6 +43,8 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Closeables;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DBCollection;
+import com.mongodb.ReadPreference;
+
 import junit.framework.Assert;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.jackrabbit.oak.api.Blob;
@@ -56,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
 import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -87,8 +90,20 @@ public class MongoBlobGCTest extends Abs
         return setUp(deleteDirect, 10);
     }
 
+    @Override
+    protected DocumentMK.Builder addToBuilder(DocumentMK.Builder mk) {
+        // Disable client session because this test modifies
+        // data directly in MongoDB.
+        return super.addToBuilder(mk)
+                .setClientSessionDisabled(true)
+                .setLeaseCheck(false);
+    }
+
     public DataStoreState setUp(boolean deleteDirect, int count) throws Exception {
         DocumentNodeStore s = mk.getNodeStore();
+        // ensure primary read preference for this test because we modify data
+        // directly in MongoDB without going through the MongoDocumentStore
+        MongoTestUtils.setReadPreference(s, ReadPreference.primary());
         NodeBuilder a = s.getRoot().builder();
 
         int number = count;
@@ -145,7 +160,7 @@ public class MongoBlobGCTest extends Abs
 
         return state;
     }
-    
+
     private class DataStoreState {
         Set<String> blobsAdded = Sets.newHashSet();
         Set<String> blobsPresent = Sets.newHashSet();
@@ -284,7 +299,10 @@ public class MongoBlobGCTest extends Abs
     public void consistencyCheckWithGc() throws Exception {
         DataStoreState state = setUp(true);
         Set<String> existingAfterGC = gc(0);
-        assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
+        assertTrue("blobsAdded: " + state.blobsAdded +
+                        ", blobsPresent: " + state.blobsPresent +
+                        ", existingAfterGC: " + existingAfterGC,
+                Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
         
         ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
         MarkSweepGarbageCollector gcObj = init(86400, executor);

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java Wed May 23 15:56:20 2018
@@ -22,9 +22,11 @@ import java.util.List;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.mongodb.ReadPreference;
 
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBVersionGCSupport;
@@ -57,6 +59,12 @@ public class VersionGCSupportTest {
         }
         if (MONGO.isAvailable()) {
             MongoDocumentStore store = (MongoDocumentStore) MONGO.createDocumentStore();
+            // Enforce primary read preference, otherwise tests may fail on a
+            // replica set with a read preference configured to secondary.
+            // Revision GC usually runs with a modified range way in the past,
+            // which means changes made it to the secondary, but not in this
+            // test using a virtual clock
+            MongoTestUtils.setReadPreference(store, ReadPreference.primary());
             fixtures.add(new Object[]{MONGO, store, new MongoVersionGCSupport(store)});
         }
         if (MEMORY.isAvailable()) {

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java Wed May 23 15:56:20 2018
@@ -67,9 +67,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Atomics;
+import com.mongodb.ReadPreference;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -129,6 +131,12 @@ public class VersionGarbageCollectorIT {
         documentMKBuilder = new DocumentMK.Builder().clock(clock).setLeaseCheck(false)
                 .setDocumentStore(fixture.createDocumentStore()).setAsyncDelay(0);
         store = documentMKBuilder.getNodeStore();
+        // Enforce primary read preference, otherwise tests may fail on a
+        // replica set with a read preference configured to secondary.
+        // Revision GC usually runs with a modified range way in the past,
+        // which means changes made it to the secondary, but not in this
+        // test using a virtual clock
+        MongoTestUtils.setReadPreference(store, ReadPreference.primary());
         gc = store.getVersionGarbageCollector();
     }