You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2018/05/23 15:56:21 UTC
svn commit: r1832110 [1/2] - in /jackrabbit/oak/trunk: ./ oak-store-document/
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/
oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
...
Author: mreutegg
Date: Wed May 23 15:56:20 2018
New Revision: 1832110
URL: http://svn.apache.org/viewvc?rev=1832110&view=rev
Log:
OAK-6087: Avoid reads from MongoDB primary
Automatically use client sessions when running on MongoDB 3.6, otherwise fall back to the previous behaviour.
The feature can be disabled with a system property: -Doak.mongo.clientSession=false
Travis runs an additional build for oak-store-document on a MongoDB replica-set and a readPreference of secondaryPreferred
Added:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoTestUtils.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatusTest.java (with props)
Removed:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ClusterDescriptionProvider.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoClusterListener.java
Modified:
jackrabbit/oak/trunk/.travis.yml
jackrabbit/oak/trunk/oak-store-document/pom.xml
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/MongoBlobStoreTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
Modified: jackrabbit/oak/trunk/.travis.yml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/.travis.yml?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/.travis.yml (original)
+++ jackrabbit/oak/trunk/.travis.yml Wed May 23 15:56:20 2018
@@ -21,12 +21,13 @@ env:
- MODULE=oak-jcr PROFILE="-PintegrationTesting" UT="-Dsurefire.skip.ut=true" MONGODB_MODE="--single"
- MODULE=oak-jcr PROFILE="" UT="" MONGODB_MODE="--single"
- MODULE=oak-store-document PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+ - MODULE=oak-store-document PROFILE="-PintegrationTesting,replicaset" UT="" MONGODB_MODE="--replicaset"
- MODULE=oak-lucene PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+ - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-run PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- - MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- - MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-upgrade PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+ - MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
+ - MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
install:
- wget -N http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-${MONGODB}.tgz -P $HOME/.mongodb
- tar --skip-old-files -C $HOME/.mongodb -xf $HOME/.mongodb/mongodb-linux-x86_64-${MONGODB}.tgz
Modified: jackrabbit/oak/trunk/oak-store-document/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/pom.xml?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-store-document/pom.xml Wed May 23 15:56:20 2018
@@ -63,6 +63,15 @@
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>replicaset</id>
+ <properties>
+ <mongo.url>mongodb://localhost:27017,localhost:27018,localhost:27019/MongoMKDB?readPreference=secondaryPreferred</mongo.url>
+ </properties>
+ </profile>
+ </profiles>
+
<dependencies>
<!-- Optional OSGi dependencies, used only when running within OSGi -->
<dependency>
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java?rev=1832110&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java Wed May 23 15:56:20 2018
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerMonitorListener;
+
+/**
+ * A composite {@link ServerMonitorListener}.
+ */
+class CompositeServerMonitorListener implements ServerMonitorListener {
+
+ private final List<ServerMonitorListener> listeners = new CopyOnWriteArrayList<>();
+
+ void addListener(ServerMonitorListener listener) {
+ listeners.add(listener);
+ }
+
+ void removeListener(ServerMonitorListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void serverHearbeatStarted(ServerHeartbeatStartedEvent event) {
+ listeners.forEach(l -> l.serverHearbeatStarted(event));
+ }
+
+ @Override
+ public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
+ listeners.forEach(l -> l.serverHeartbeatSucceeded(event));
+ }
+
+ @Override
+ public void serverHeartbeatFailed(ServerHeartbeatFailedEvent event) {
+ listeners.forEach(l -> l.serverHeartbeatFailed(event));
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CompositeServerMonitorListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java Wed May 23 15:56:20 2018
@@ -31,6 +31,7 @@ import org.apache.jackrabbit.oak.plugins
import org.bson.conversions.Bson;
import com.mongodb.BasicDBObject;
+import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
@@ -38,7 +39,8 @@ public class MongoBlobReferenceIterator
private final MongoDocumentStore documentStore;
- public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, MongoDocumentStore documentStore) {
+ public MongoBlobReferenceIterator(DocumentNodeStore nodeStore,
+ MongoDocumentStore documentStore) {
super(nodeStore);
this.documentStore = documentStore;
}
@@ -46,9 +48,8 @@ public class MongoBlobReferenceIterator
@Override
public Iterator<NodeDocument> getIteratorOverDocsWithBinaries() {
Bson query = Filters.eq(NodeDocument.HAS_BINARY_FLAG, NodeDocument.HAS_BINARY_VAL);
- // TODO It currently prefers secondary. Would that be Ok?
+ // TODO It currently uses the configured read preference. Would that be Ok?
MongoCursor<BasicDBObject> cursor = documentStore.getDBCollection(NODES)
- .withReadPreference(documentStore.getConfiguredReadPreference(NODES))
.find(query).iterator();
return CloseableIterator.wrap(transform(cursor,
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java Wed May 23 15:56:20 2018
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.mongodb.ReadPreference;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import org.apache.jackrabbit.oak.commons.StringUtils;
@@ -42,7 +43,6 @@ import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import static com.mongodb.ReadPreference.primary;
-import static com.mongodb.ReadPreference.secondaryPreferred;
import static java.util.stream.StreamSupport.stream;
import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
@@ -68,6 +68,7 @@ public class MongoBlobStore extends Cach
fromCodecs(new MongoBlobCodec())
);
+ private final ReadPreference defaultReadPreference;
private final MongoCollection<MongoBlob> blobCollection;
private long minLastModified;
@@ -86,6 +87,7 @@ public class MongoBlobStore extends Cach
// space allocated for a record to the next power of two
// (there is an overhead per record, let's assume it is 1 KB at most)
setBlockSize(2 * 1024 * 1024 - 1024);
+ defaultReadPreference = db.getReadPreference();
blobCollection = initBlobCollection(db);
}
@@ -184,8 +186,12 @@ public class MongoBlobStore extends Cach
.noneMatch(COLLECTION_BLOBS::equals)) {
db.createCollection(COLLECTION_BLOBS);
}
+ // override the read preference configured with the MongoDB URI
+ // and use the primary as default. Reading a blob will still
+ // try a secondary first and then fallback to the primary.
return db.getCollection(COLLECTION_BLOBS, MongoBlob.class)
- .withCodecRegistry(CODEC_REGISTRY);
+ .withCodecRegistry(CODEC_REGISTRY)
+ .withReadPreference(primary());
}
private MongoCollection<MongoBlob> getBlobCollection() {
@@ -196,10 +202,9 @@ public class MongoBlobStore extends Cach
Bson query = getBlobQuery(id, lastMod);
Bson fields = new BasicDBObject(MongoBlob.KEY_DATA, 1);
- // try the secondary first
- // TODO add a configuration option for whether to try reading from secondary
+ // try with default read preference first, may be from secondary
List<MongoBlob> result = new ArrayList<>(1);
- getBlobCollection().withReadPreference(secondaryPreferred()).find(query)
+ getBlobCollection().withReadPreference(defaultReadPreference).find(query)
.projection(fields).into(result);
if (result.isEmpty()) {
// not found in the secondary: try the primary
@@ -244,7 +249,6 @@ public class MongoBlobStore extends Cach
@Override
public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
Bson fields = new BasicDBObject(MongoBlob.KEY_ID, 1);
- Bson hint = new BasicDBObject("$hint", fields);
Bson query = new Document();
if (maxLastModifiedTime != 0 && maxLastModifiedTime != -1) {
@@ -252,7 +256,7 @@ public class MongoBlobStore extends Cach
}
final MongoCursor<MongoBlob> cur = getBlobCollection().find(query)
- .projection(fields).modifiers(hint).iterator();
+ .projection(fields).hint(fields).iterator();
//TODO The cursor needs to be closed
return new AbstractIterator<String>() {
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java Wed May 23 15:56:20 2018
@@ -16,7 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.document.mongo;
-import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
@@ -56,6 +55,7 @@ public abstract class MongoDocumentNodeS
private boolean socketKeepAlive = true;
private MongoStatus mongoStatus;
private long maxReplicationLagMillis = TimeUnit.HOURS.toMillis(6);
+ private boolean clientSessionDisabled = false;
/**
* Uses the given information to connect to to MongoDB as backend
@@ -70,21 +70,19 @@ public abstract class MongoDocumentNodeS
* any database name given in the {@code uri}.
* @param blobCacheSizeMB the blob cache size in MB.
* @return this
- * @throws UnknownHostException if one of the hosts given in the URI
- * is unknown.
*/
public T setMongoDB(@Nonnull String uri,
@Nonnull String name,
- int blobCacheSizeMB)
- throws UnknownHostException {
+ int blobCacheSizeMB) {
this.mongoUri = uri;
- MongoClusterListener listener = new MongoClusterListener();
+ CompositeServerMonitorListener serverMonitorListener = new CompositeServerMonitorListener();
MongoClientOptions.Builder options = MongoConnection.getDefaultBuilder();
- options.addClusterListener(listener);
+ options.addServerMonitorListener(serverMonitorListener);
options.socketKeepAlive(socketKeepAlive);
MongoClient client = new MongoClient(new MongoClientURI(uri, options));
- MongoStatus status = new MongoStatus(client, name, listener);
+ MongoStatus status = new MongoStatus(client, name);
+ serverMonitorListener.addListener(status);
MongoDatabase db = client.getDatabase(name);
if (!MongoConnection.hasWriteConcern(uri)) {
db = db.withWriteConcern(MongoConnection.getDefaultWriteConcern(client));
@@ -167,6 +165,26 @@ public abstract class MongoDocumentNodeS
return socketKeepAlive;
}
+ /**
+ * Disables the use of a client session available with MongoDB 3.6 and
+ * newer. By default the MongoDocumentStore will use a client session if
+ * available. That is, when connected to MongoDB 3.6 and newer.
+ *
+ * @param b whether to disable the use of a client session.
+ * @return this
+ */
+ public T setClientSessionDisabled(boolean b) {
+ this.clientSessionDisabled = b;
+ return thisBuilder();
+ }
+
+ /**
+ * @return whether the use of a client session is disabled.
+ */
+ boolean isClientSessionDisabled() {
+ return clientSessionDisabled;
+ }
+
public T setMaxReplicationLag(long duration, TimeUnit unit){
maxReplicationLagMillis = unit.toMillis(duration);
return thisBuilder();
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Wed May 23 15:56:20 2018
@@ -105,7 +105,9 @@ import com.mongodb.client.model.ReturnDo
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
+import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
+import com.mongodb.session.ClientSession;
import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
@@ -150,6 +152,8 @@ public class MongoDocumentStore implemen
private final MongoCollection<BasicDBObject> journal;
private final MongoClient client;
+ private final MongoStatus status;
+ private final MongoSessionFactory sessionFactory;
private final MongoDatabase db;
private final NodeDocumentCache nodesCache;
@@ -236,6 +240,19 @@ public class MongoDocumentStore implemen
private final int queryRetries =
Integer.getInteger("oak.mongo.queryRetries", 2);
+ /**
+ * Acceptable replication lag of secondaries in milliseconds. Reads are
+ * directed to the primary if the estimated replication lag is higher than
+ * this value.
+ */
+ private final int acceptableLagMillis =
+ Integer.getInteger("oak.mongo.acceptableLagMillis", 5000);
+
+ /**
+ * Feature flag for use of MongoDB client sessions.
+ */
+ private final boolean useClientSession;
+
private String lastReadWriteMode;
private final Map<String, String> metadata;
@@ -262,6 +279,8 @@ public class MongoDocumentStore implemen
.build();
this.client = client;
+ this.status = mongoStatus;
+ this.sessionFactory = new MongoSessionFactory(client);
this.db = client.getDatabase(dbName);
stats = builder.getDocumentStoreStatsCollector();
nodes = db.getCollection(Collection.NODES.toString(), BasicDBObject.class);
@@ -280,18 +299,22 @@ public class MongoDocumentStore implemen
replicaInfoThread.setDaemon(true);
replicaInfoThread.start();
}
+ useClientSession = !builder.isClientSessionDisabled()
+ && Boolean.parseBoolean(System.getProperty("oak.mongo.clientSession", "true"));
- // indexes:
- // the _id field is the primary key, so we don't need to define it
+ // counting the number of documents in the nodes collection and
+ // checking existing indexes is performed against the MongoDB primary
+ // this ensure the information is up-to-date and accurate
+ long initialDocsCount = getNodesCount();
- long initialDocsCount = nodes.count();
// compound index on _modified and _id
if (initialDocsCount == 0) {
// this is an empty store, create a compound index
// on _modified and _id (OAK-3071)
createIndex(nodes, new String[]{NodeDocument.MODIFIED_IN_SECS, Document.ID},
new boolean[]{true, true}, false, false);
- } else if (!hasIndex(nodes, NodeDocument.MODIFIED_IN_SECS, Document.ID)) {
+ } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()),
+ NodeDocument.MODIFIED_IN_SECS, Document.ID)) {
hasModifiedIdCompoundIndex = false;
if (!builder.getReadOnlyMode()) {
LOG.warn("Detected an upgrade from Oak version <= 1.2. For optimal " +
@@ -312,7 +335,8 @@ public class MongoDocumentStore implemen
} else {
createIndex(nodes, NodeDocument.DELETED_ONCE, true, false, true);
}
- } else if (!hasIndex(nodes, DELETED_ONCE, MODIFIED_IN_SECS)) {
+ } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()),
+ DELETED_ONCE, MODIFIED_IN_SECS)) {
if (!builder.getReadOnlyMode()) {
LOG.warn("Detected an upgrade from Oak version <= 1.6. For optimal " +
"Revision GC performance it is recommended to create a " +
@@ -329,7 +353,8 @@ public class MongoDocumentStore implemen
// on _sdType and _sdMaxRevTime (OAK-6129)
createIndex(nodes, new String[]{SD_TYPE, SD_MAX_REV_TIME_IN_SECS},
new boolean[]{true, true}, false, true);
- } else if (!hasIndex(nodes, SD_TYPE, SD_MAX_REV_TIME_IN_SECS)) {
+ } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()),
+ SD_TYPE, SD_MAX_REV_TIME_IN_SECS)) {
if (!builder.getReadOnlyMode()) {
LOG.warn("Detected an upgrade from Oak version <= 1.6. For optimal " +
"Revision GC performance it is recommended to create a " +
@@ -346,10 +371,11 @@ public class MongoDocumentStore implemen
LOG.info("Connected to MongoDB {} with maxReplicationLagMillis {}, " +
"maxDeltaForModTimeIdxSecs {}, disableIndexHint {}, " +
- "{}, serverStatus {}",
- mongoStatus.getVersion(), maxReplicationLagMillis, maxDeltaForModTimeIdxSecs,
- disableIndexHint, db.getWriteConcern(),
- mongoStatus.getServerDetails());
+ "clientSessionSupported {}, clientSessionInUse {}, serverStatus {}",
+ mongoStatus.getVersion(), maxReplicationLagMillis,
+ maxDeltaForModTimeIdxSecs, disableIndexHint,
+ status.isClientSessionSupported(), useClientSession,
+ db.getWriteConcern(), mongoStatus.getServerDetails());
}
public boolean isReadOnly() {
@@ -446,8 +472,11 @@ public class MongoDocumentStore implemen
boolean preferCached,
final int maxCacheAge) {
if (collection != Collection.NODES) {
- return findUncachedWithRetry(collection, key,
- DocumentReadPreference.PRIMARY);
+ DocumentReadPreference readPref = DocumentReadPreference.PRIMARY;
+ if (withClientSession()) {
+ readPref = getDefaultReadPreference(collection);
+ }
+ return findUncachedWithRetry(collection, key, readPref);
}
NodeDocument doc;
if (maxCacheAge > 0 || preferCached) {
@@ -551,12 +580,12 @@ public class MongoDocumentStore implemen
@CheckForNull
protected <T extends Document> T findUncached(Collection<T> collection, String key, DocumentReadPreference docReadPref) {
log("findUncached", key, docReadPref);
- MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection);
final Stopwatch watch = startWatch();
boolean isSlaveOk = false;
boolean docFound = true;
try {
ReadPreference readPreference = getMongoReadPreference(collection, null, key, docReadPref);
+ MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection, readPreference);
if(readPreference.isSlaveOk()){
LOG.trace("Routing call to secondary for fetching [{}]", key);
@@ -564,7 +593,14 @@ public class MongoDocumentStore implemen
}
List<BasicDBObject> result = new ArrayList<>(1);
- dbCollection.withReadPreference(readPreference).find(getByKeyQuery(key)).into(result);
+ execute(session -> {
+ if (session != null) {
+ dbCollection.find(session, getByKeyQuery(key)).into(result);
+ } else {
+ dbCollection.find(getByKeyQuery(key)).into(result);
+ }
+ return null;
+ });
if(result.isEmpty()) {
docFound = false;
@@ -644,13 +680,18 @@ public class MongoDocumentStore implemen
int limit,
long maxQueryTime) {
log("query", fromKey, toKey, indexedProperty, startValue, limit);
- MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection);
List<Bson> clauses = new ArrayList<>();
clauses.add(Filters.gt(Document.ID, fromKey));
clauses.add(Filters.lt(Document.ID, toKey));
- Bson hint = new BasicDBObject(NodeDocument.ID, 1);
+ Bson hint;
+ if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty)
+ && canUseModifiedTimeIdx(startValue)) {
+ hint = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1);
+ } else {
+ hint = new BasicDBObject(NodeDocument.ID, 1);
+ }
if (indexedProperty != null) {
if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) {
@@ -662,17 +703,12 @@ public class MongoDocumentStore implemen
clauses.add(Filters.eq(indexedProperty, true));
} else {
clauses.add(Filters.gte(indexedProperty, startValue));
-
- if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty)
- && canUseModifiedTimeIdx(startValue)) {
- hint = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1);
- }
}
}
Bson query = Filters.and(clauses);
String parentId = Utils.getParentIdFromLowerLimit(fromKey);
long lockTime = -1;
- final Stopwatch watch = startWatch();
+ final Stopwatch watch = startWatch();
boolean isSlaveOk = false;
int resultSize = 0;
@@ -688,29 +724,38 @@ public class MongoDocumentStore implemen
isSlaveOk = true;
LOG.trace("Routing call to secondary for fetching children from [{}] to [{}]", fromKey, toKey);
}
- FindIterable<BasicDBObject> result = dbCollection
- .withReadPreference(readPreference).find(query).sort(BY_ID_ASC);
- if (limit >= 0) {
- result.limit(limit);
- }
- if (!disableIndexHint && !hasModifiedIdCompoundIndex) {
- result.modifiers(new BasicDBObject("$hint", hint));
- }
- if (maxQueryTime > 0) {
- // OAK-2614: set maxTime if maxQueryTimeMS > 0
- result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
- }
-
- List<T> list;
- try (MongoCursor<BasicDBObject> cursor = result.iterator()) {
- list = new ArrayList<T>();
- for (int i = 0; i < limit && cursor.hasNext(); i++) {
- BasicDBObject o = cursor.next();
- T doc = convertFromDBObject(collection, o);
- list.add(doc);
+
+ List<T> list = new ArrayList<T>();
+ MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection, readPreference);
+ execute(session -> {
+ FindIterable<BasicDBObject> result;
+ if (session != null) {
+ result = dbCollection.find(session, query);
+ } else {
+ result = dbCollection.find(query);
}
- resultSize = list.size();
- }
+ result.sort(BY_ID_ASC);
+ if (limit >= 0) {
+ result.limit(limit);
+ }
+ if (!disableIndexHint && !hasModifiedIdCompoundIndex) {
+ result.hint(hint);
+ }
+ if (maxQueryTime > 0) {
+ // OAK-2614: set maxTime if maxQueryTimeMS > 0
+ result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
+ }
+
+ try (MongoCursor<BasicDBObject> cursor = result.iterator()) {
+ for (int i = 0; i < limit && cursor.hasNext(); i++) {
+ BasicDBObject o = cursor.next();
+ T doc = convertFromDBObject(collection, o);
+ list.add(doc);
+ }
+ }
+ return null;
+ });
+ resultSize = list.size();
if (cacheChangesTracker != null) {
nodesCache.putNonConflictingDocs(cacheChangesTracker, (List<NodeDocument>) list);
@@ -739,7 +784,15 @@ public class MongoDocumentStore implemen
MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection);
Stopwatch watch = startWatch();
try {
- dbCollection.deleteOne(getByKeyQuery(key));
+ execute(session -> {
+ Bson filter = getByKeyQuery(key);
+ if (session != null) {
+ dbCollection.deleteOne(session, filter);
+ } else {
+ dbCollection.deleteOne(filter);
+ }
+ return null;
+ });
} catch (Exception e) {
throw DocumentStoreException.convert(e, "Remove failed for " + key);
} finally {
@@ -757,7 +810,14 @@ public class MongoDocumentStore implemen
for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){
Bson query = Filters.in(Document.ID, keyBatch);
try {
- dbCollection.deleteMany(query);
+ execute(session -> {
+ if (session != null) {
+ dbCollection.deleteMany(session, query);
+ } else {
+ dbCollection.deleteMany(query);
+ }
+ return null;
+ });
} catch (Exception e) {
throw DocumentStoreException.convert(e, "Remove failed for " + keyBatch);
} finally {
@@ -793,7 +853,15 @@ public class MongoDocumentStore implemen
if (!it.hasNext() || batch.size() == IN_CLAUSE_BATCH_SIZE) {
Bson query = Filters.or(batch);
try {
- num += dbCollection.deleteMany(query).getDeletedCount();
+ num += execute(session -> {
+ DeleteResult result;
+ if (session != null) {
+ result = dbCollection.deleteMany(session, query);
+ } else {
+ result = dbCollection.deleteMany(query);
+ }
+ return result.getDeletedCount();
+ });
} catch (Exception e) {
throw DocumentStoreException.convert(e, "Remove failed for " + batch);
} finally {
@@ -825,7 +893,15 @@ public class MongoDocumentStore implemen
Filters.lt(indexedProperty, endValue)
);
try {
- num = (int) Math.min(dbCollection.deleteMany(query).getDeletedCount(), Integer.MAX_VALUE);
+ num = (int) Math.min(execute((DocumentStoreCallable<Long>) session -> {
+ DeleteResult result;
+ if (session != null) {
+ result = dbCollection.deleteMany(session, query);
+ } else {
+ result = dbCollection.deleteMany(query);
+ }
+ return result.getDeletedCount();
+ }), Integer.MAX_VALUE);
} catch (Exception e) {
throw DocumentStoreException.convert(e, "Remove failed for " + collection + ": " +
indexedProperty + " in (" + startValue + ", " + endValue + ")");
@@ -880,16 +956,23 @@ public class MongoDocumentStore implemen
// no conditions and the check is OK. this avoid an
// unnecessary call when the conditions do not match
if (!checkConditions || UpdateUtils.checkConditions(cachedDoc, updateOp.getConditions())) {
- Bson query = createQueryForUpdate(updateOp.getId(),
- updateOp.getConditions());
// below condition may overwrite a user supplied condition
// on _modCount. This is fine, because the conditions were
// already checked against the cached document with the
// matching _modCount value. There is no need to check the
// user supplied condition on _modCount again on the server
- query = Filters.and(query, Filters.eq(Document.MOD_COUNT, modCount));
-
- UpdateResult result = dbCollection.updateOne(query, update);
+ Bson query = Filters.and(
+ createQueryForUpdate(updateOp.getId(), updateOp.getConditions()),
+ Filters.eq(Document.MOD_COUNT, modCount)
+ );
+
+ UpdateResult result = execute(session -> {
+ if (session != null) {
+ return dbCollection.updateOne(session, query, update);
+ } else {
+ return dbCollection.updateOne(query, update);
+ }
+ });
if (result.getModifiedCount() > 0) {
// success, update cached document
if (collection == Collection.NODES) {
@@ -907,7 +990,13 @@ public class MongoDocumentStore implemen
Bson query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions());
FindOneAndUpdateOptions options = new FindOneAndUpdateOptions()
.returnDocument(ReturnDocument.BEFORE).upsert(upsert);
- BasicDBObject oldNode = dbCollection.findOneAndUpdate(query, update, options);
+ BasicDBObject oldNode = execute(session -> {
+ if (session != null) {
+ return dbCollection.findOneAndUpdate(session, query, update, options);
+ } else {
+ return dbCollection.findOneAndUpdate(query, update, options);
+ }
+ });
if (oldNode == null){
newEntry = true;
@@ -1143,13 +1232,26 @@ public class MongoDocumentStore implemen
for (String key : keys) {
conditions.add(getByKeyQuery(key));
}
-
- FindIterable<BasicDBObject> cursor = getDBCollection(collection)
- .find(Filters.or(conditions));
- for (BasicDBObject doc : cursor) {
- T foundDoc = convertFromDBObject(collection, doc);
- docs.put(foundDoc.getId(), foundDoc);
+ MongoCollection<BasicDBObject> dbCollection;
+ if (secondariesWithinAcceptableLag()) {
+ dbCollection = getDBCollection(collection);
+ } else {
+ lagTooHigh();
+ dbCollection = getDBCollection(collection).withReadPreference(ReadPreference.primary());
}
+ execute(session -> {
+ FindIterable<BasicDBObject> cursor;
+ if (session != null) {
+ cursor = dbCollection.find(session, Filters.or(conditions));
+ } else {
+ cursor = dbCollection.find(Filters.or(conditions));
+ }
+ for (BasicDBObject doc : cursor) {
+ T foundDoc = convertFromDBObject(collection, doc);
+ docs.put(foundDoc.getId(), foundDoc);
+ }
+ return null;
+ });
}
return docs;
}
@@ -1177,9 +1279,15 @@ public class MongoDocumentStore implemen
BulkWriteResult bulkResult;
Set<String> failedUpdates = new HashSet<String>();
Set<String> upserts = new HashSet<String>();
+ BulkWriteOptions options = new BulkWriteOptions().ordered(false);
try {
- bulkResult = dbCollection.bulkWrite(writes,
- new BulkWriteOptions().ordered(false));
+ bulkResult = execute(session -> {
+ if (session != null) {
+ return dbCollection.bulkWrite(session, writes, options);
+ } else {
+ return dbCollection.bulkWrite(writes, options);
+ }
+ });
} catch (MongoBulkWriteException e) {
bulkResult = e.getWriteResult();
for (BulkWriteError err : e.getWriteErrors()) {
@@ -1257,7 +1365,14 @@ public class MongoDocumentStore implemen
boolean insertSuccess = false;
try {
try {
- dbCollection.insertMany(inserts);
+ execute(session -> {
+ if (session != null) {
+ dbCollection.insertMany(session, inserts);
+ } else {
+ dbCollection.insertMany(inserts);
+ }
+ return null;
+ });
if (collection == Collection.NODES) {
for (T doc : docs) {
nodesCache.putIfAbsent((NodeDocument) doc);
@@ -1314,7 +1429,9 @@ public class MongoDocumentStore implemen
DocumentReadPreference getReadPreference(int maxCacheAge){
long lag = fallbackSecondaryStrategy ? maxReplicationLagMillis : replicaInfo.getLag();
- if(maxCacheAge >= 0 && maxCacheAge < lag) {
+ if (withClientSession()) {
+ return DocumentReadPreference.PREFER_SECONDARY;
+ } else if(maxCacheAge >= 0 && maxCacheAge < lag) {
return DocumentReadPreference.PRIMARY;
} else if(maxCacheAge == Integer.MAX_VALUE){
return DocumentReadPreference.PREFER_SECONDARY;
@@ -1323,8 +1440,14 @@ public class MongoDocumentStore implemen
}
}
- DocumentReadPreference getDefaultReadPreference(Collection col){
- return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : DocumentReadPreference.PRIMARY;
+ DocumentReadPreference getDefaultReadPreference(Collection col) {
+ DocumentReadPreference preference = DocumentReadPreference.PRIMARY;
+ if (withClientSession()) {
+ preference = DocumentReadPreference.PREFER_SECONDARY;
+ } else if (col == Collection.NODES) {
+ preference = DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH;
+ }
+ return preference;
}
<T extends Document> ReadPreference getMongoReadPreference(@Nonnull Collection<T> collection,
@@ -1337,14 +1460,21 @@ public class MongoDocumentStore implemen
case PREFER_PRIMARY :
return ReadPreference.primaryPreferred();
case PREFER_SECONDARY :
- return getConfiguredReadPreference(collection);
+ if (!withClientSession() || secondariesWithinAcceptableLag()) {
+ return getConfiguredReadPreference(collection);
+ } else {
+ lagTooHigh();
+ return ReadPreference.primary();
+ }
case PREFER_SECONDARY_IF_OLD_ENOUGH:
if(collection != Collection.NODES){
return ReadPreference.primary();
}
boolean secondarySafe;
- if (fallbackSecondaryStrategy) {
+ if (withClientSession() && secondariesWithinAcceptableLag()) {
+ secondarySafe = true;
+ } else if (fallbackSecondaryStrategy) {
// This is not quite accurate, because ancestors
// are updated in a background thread (_lastRev). We
// will need to revise this for low maxReplicationLagMillis
@@ -1444,6 +1574,11 @@ public class MongoDocumentStore implemen
}
}
+ <T extends Document> MongoCollection<BasicDBObject> getDBCollection(Collection<T> collection,
+ ReadPreference readPreference) {
+ return getDBCollection(collection).withReadPreference(readPreference);
+ }
+
MongoDatabase getDatabase() {
return db;
}
@@ -1790,6 +1925,68 @@ public class MongoDocumentStore implemen
});
}
+ /**
+ * Returns the number of documents in the {@link #nodes} collection. The read
+ * always happens on the MongoDB primary.
+ *
+ * @return the number of documents in the {@link #nodes} collection.
+ */
+ private long getNodesCount() {
+ return execute(session -> {
+ MongoCollection<?> c = nodes.withReadPreference(ReadPreference.primary());
+ long count;
+ if (session != null) {
+ count = c.count(session);
+ } else {
+ count = c.count();
+ }
+ return count;
+ });
+ }
+
+ private boolean withClientSession() {
+ return status.isClientSessionSupported() && useClientSession;
+ }
+
+ private boolean secondariesWithinAcceptableLag() {
+ return client.getReplicaSetStatus() == null
+ || status.getReplicaSetLagEstimate() < acceptableLagMillis;
+ }
+
+ private void lagTooHigh() {
+ LOG.debug("Read from secondary is preferred but replication lag is too high. Directing read to primary.");
+ }
+
+ /**
+ * Execute a callable with an optional {@link ClientSession}. A client
+ * session is passed to {@link DocumentStoreCallable#call(ClientSession)} if
+ * the connected MongoDB servers support client sessions, otherwise the
+ * session is {@code null}. The client session must only be used within
+ * the scope of the {@link DocumentStoreCallable#call(ClientSession)}.
+ *
+ * @param callable the callable.
+ * @param <T> the return type of the callable.
+ * @return the result of the callable.
+ * @throws DocumentStoreException if the callable throws an exception.
+ */
+ private <T> T execute(DocumentStoreCallable<T> callable)
+ throws DocumentStoreException {
+ T result;
+ if (withClientSession()) {
+ try (ClientSession session = sessionFactory.createClientSession()) {
+ result = callable.call(session);
+ }
+ } else {
+ result = callable.call(null);
+ }
+ return result;
+ }
+
+ interface DocumentStoreCallable<T> {
+
+ T call(@Nullable ClientSession session) throws DocumentStoreException;
+ }
+
private static class BulkUpdateResult {
private final Set<String> failedUpdates;
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java Wed May 23 15:56:20 2018
@@ -59,7 +59,6 @@ public class MongoMissingLastRevSeeker e
Bson sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1);
FindIterable<BasicDBObject> cursor = getNodeCollection()
- .withReadPreference(ReadPreference.primary())
.find(query).sort(sortFields);
return CloseableIterable.wrap(transform(cursor,
input -> store.convertFromDBObject(NODES, input)));
@@ -76,7 +75,7 @@ public class MongoMissingLastRevSeeker e
}
private MongoCollection<BasicDBObject> getNodeCollection() {
- return store.getDBCollection(NODES);
+ return store.getDBCollection(NODES, ReadPreference.primary());
}
private MongoCollection<BasicDBObject> getClusterNodeCollection() {
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java?rev=1832110&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java Wed May 23 15:56:20 2018
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import com.mongodb.ClientSessionOptions;
+import com.mongodb.MongoClient;
+import com.mongodb.session.ClientSession;
+import com.mongodb.session.ServerSession;
+
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+
+/**
+ * Factory for {@link ClientSession}s.
+ */
+class MongoSessionFactory {
+
+ private final MongoClient client;
+
+ private final ClientSessionOptions options;
+
+ private BsonDocument clusterTime;
+
+ private BsonTimestamp operationTime;
+
+ MongoSessionFactory(MongoClient client) {
+ this.client = client;
+ this.options = ClientSessionOptions.builder()
+ .causallyConsistent(true).build();
+ }
+
+ ClientSession createClientSession() {
+ ClientSession s = client.startSession(options);
+ synchronized (this) {
+ s.advanceClusterTime(clusterTime);
+ s.advanceOperationTime(operationTime);
+ }
+ return new TrackingClientSession(s);
+ }
+
+ private class TrackingClientSession implements ClientSession {
+
+ private final ClientSession session;
+
+ TrackingClientSession(ClientSession session) {
+ this.session = session;
+ }
+
+ @Override
+ public ClientSessionOptions getOptions() {
+ return session.getOptions();
+ }
+
+ @Override
+ public boolean isCausallyConsistent() {
+ return session.isCausallyConsistent();
+ }
+
+ @Override
+ public Object getOriginator() {
+ return session.getOriginator();
+ }
+
+ @Override
+ public ServerSession getServerSession() {
+ return session.getServerSession();
+ }
+
+ @Override
+ public BsonTimestamp getOperationTime() {
+ return session.getOperationTime();
+ }
+
+ @Override
+ public void advanceOperationTime(BsonTimestamp operationTime) {
+ session.advanceOperationTime(operationTime);
+ }
+
+ @Override
+ public void advanceClusterTime(BsonDocument clusterTime) {
+ session.advanceClusterTime(clusterTime);
+ }
+
+ @Override
+ public BsonDocument getClusterTime() {
+ return session.getClusterTime();
+ }
+
+ @Override
+ public void close() {
+ synchronized (this) {
+ session.advanceClusterTime(clusterTime);
+ clusterTime = session.getClusterTime();
+ session.advanceOperationTime(operationTime);
+ operationTime = session.getOperationTime();
+ }
+ session.close();
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java Wed May 23 15:56:20 2018
@@ -19,21 +19,30 @@ package org.apache.jackrabbit.oak.plugin
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
+import com.mongodb.ClientSessionOptions;
import com.mongodb.MongoClient;
+import com.mongodb.MongoClientException;
import com.mongodb.MongoQueryException;
import com.mongodb.ReadConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
+import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerMonitorListener;
+import com.mongodb.session.ClientSession;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class MongoStatus {
+public class MongoStatus implements ServerMonitorListener {
private static final Logger LOG = LoggerFactory.getLogger(MongoStatus.class);
@@ -46,8 +55,6 @@ public class MongoStatus {
private final String dbName;
- private final ClusterDescriptionProvider descriptionProvider;
-
private BasicDBObject serverStatus;
private BasicDBObject buildInfo;
@@ -58,17 +65,14 @@ public class MongoStatus {
private Boolean majorityReadConcernEnabled;
- public MongoStatus(@Nonnull MongoClient client,
- @Nonnull String dbName) {
- this(client, dbName, () -> null);
- }
+ private Boolean clientSessionSupported;
+
+ private final ReplicaSetStatus replicaSetStatus = new ReplicaSetStatus();
public MongoStatus(@Nonnull MongoClient client,
- @Nonnull String dbName,
- @Nonnull ClusterDescriptionProvider descriptionProvider) {
+ @Nonnull String dbName) {
this.client = client;
this.dbName = dbName;
- this.descriptionProvider = descriptionProvider;
}
public void checkVersion() {
@@ -172,6 +176,66 @@ public class MongoStatus {
}
}
+ /**
+ * @return {@code true} if client sessions are supported.
+ */
+ boolean isClientSessionSupported() {
+ if (clientSessionSupported == null) {
+ // must be at least 3.6
+ if (isVersion(3, 6)) {
+ ClientSessionOptions options = ClientSessionOptions.builder()
+ .causallyConsistent(true).build();
+ try (ClientSession ignored = client.startSession(options)) {
+ clientSessionSupported = true;
+ } catch (MongoClientException e) {
+ clientSessionSupported = false;
+ }
+ } else {
+ clientSessionSupported = false;
+ }
+ }
+ return clientSessionSupported;
+ }
+
+ /**
+ * Returns an estimate of the replica-set lag in milliseconds. The returned
+ * value is not an accurate measurement of the replication lag and should
+ * only be used as a rough estimate to decide whether secondaries can be
+ * used for queries in general.
+ * <p>
+ * This method may return {@link ReplicaSetStatus#UNKNOWN_LAG} if the value
+ * is currently unknown.
+ *
+ * @return an estimate of the
+ */
+ long getReplicaSetLagEstimate() {
+ return replicaSetStatus.getLagEstimate();
+ }
+
+ //------------------------< ServerMonitorListener >-------------------------
+
+ @Override
+ public void serverHearbeatStarted(ServerHeartbeatStartedEvent event) {
+ LOG.debug("serverHeartbeatStarted {}", event.getConnectionId());
+ replicaSetStatus.serverHearbeatStarted(event);
+ }
+
+ @Override
+ public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
+ LOG.debug("serverHeartbeatSucceeded {}, {}", event.getConnectionId(), event.getReply());
+ replicaSetStatus.serverHeartbeatSucceeded(event);
+ }
+
+ @Override
+ public void serverHeartbeatFailed(ServerHeartbeatFailedEvent event) {
+ LOG.debug("serverHeartbeatFailed {} ({} ms)", event.getConnectionId(),
+ event.getElapsedTime(TimeUnit.MILLISECONDS),
+ event.getThrowable());
+ replicaSetStatus.serverHeartbeatFailed(event);
+ }
+
+ //-------------------------------< internal >-------------------------------
+
private BasicDBObject getServerStatus() {
if (serverStatus == null) {
serverStatus = client.getDatabase(dbName).runCommand(
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java Wed May 23 15:56:20 2018
@@ -34,7 +34,6 @@ import com.google.common.base.StandardSy
import com.google.common.collect.Lists;
import com.mongodb.BasicDBObject;
import com.mongodb.Block;
-import com.mongodb.ReadPreference;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
@@ -100,7 +99,6 @@ public class MongoVersionGCSupport exten
Filters.lt(MODIFIED_IN_SECS, getModifiedInSecs(toModified))
);
FindIterable<BasicDBObject> cursor = getNodeCollection()
- .withReadPreference(ReadPreference.secondaryPreferred())
.find(query).batchSize(batchSize);
return CloseableIterable.wrap(transform(cursor,
@@ -110,9 +108,7 @@ public class MongoVersionGCSupport exten
@Override
public long getDeletedOnceCount() {
Bson query = Filters.eq(DELETED_ONCE, Boolean.TRUE);
- return getNodeCollection()
- .withReadPreference(ReadPreference.secondaryPreferred())
- .count(query);
+ return getNodeCollection().count(query);
}
@Override
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java?rev=1832110&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java Wed May 23 15:56:20 2018
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.CheckForNull;
+
+import com.mongodb.ServerAddress;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerMonitorListenerAdapter;
+
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Keeps track of the status of a replica set based on information provided
+ * by heartbeat events. This status provides a replica set lag estimate, which
+ * can be used to decide whether secondaries are sufficiently up-to-date and
+ * read operations can be sent to a secondary. This is particularly useful when
+ * causal consistent client sessions are used with the MongoDB Java driver. Read
+ * operations shouldn't be sent to a secondary when it lags too much behind,
+ * otherwise the read operation will block until it was able to catch up.
+ */
+public class ReplicaSetStatus extends ServerMonitorListenerAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetStatus.class);
+
+ public static final long UNKNOWN_LAG = Long.MAX_VALUE;
+
+ /**
+ * Most recent heartbeats from connected servers
+ */
+ private final Map<ServerAddress, Heartbeat> heartbeats = new HashMap<>();
+
+ private final Set<ServerAddress> members = new HashSet<>();
+
+ private final Deque<Long> estimatesPerMember = new LinkedList<>();
+
+ private long lagEstimate = UNKNOWN_LAG;
+
+ @Override
+ public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
+ synchronized (heartbeats) {
+ ServerAddress address = event.getConnectionId().getServerId().getAddress();
+ Heartbeat beat = new Heartbeat(event);
+ heartbeats.put(address, beat);
+ members.addAll(beat.getHosts());
+ if (!members.isEmpty()) {
+ updateLag();
+ }
+ }
+ }
+
+ public long getLagEstimate() {
+ return lagEstimate;
+ }
+
+ private void updateLag() {
+ if (!heartbeats.keySet().containsAll(members)) {
+ lagEstimate = UNKNOWN_LAG;
+ return;
+ }
+
+ long oldestUpdate = Long.MAX_VALUE;
+ long newestUpdate = Long.MIN_VALUE;
+ long oldestWrite = Long.MAX_VALUE;
+ long newestWrite = Long.MIN_VALUE;
+ for (Map.Entry<ServerAddress, Heartbeat> entry : heartbeats.entrySet()) {
+ if (!members.contains(entry.getKey())) {
+ continue;
+ }
+ Heartbeat beat = entry.getValue();
+ Date lastWrite = beat.getLastWrite();
+ if (lastWrite == null) {
+ oldestWrite = 0;
+ newestWrite = Long.MAX_VALUE;
+ } else {
+ oldestWrite = Math.min(oldestWrite, lastWrite.getTime());
+ newestWrite = Math.max(newestWrite, lastWrite.getTime());
+ }
+ long updateTime = beat.getTime();
+ oldestUpdate = Math.min(oldestUpdate, updateTime);
+ newestUpdate = Math.max(newestUpdate, updateTime);
+ }
+ // heartbeats happen concurrently for all servers. It may happen we
+ // have some fresh and some stale heartbeats with update times up to
+ // heartbeatFreqMillis apart
+ long uncertaintyMillis = newestUpdate - oldestUpdate;
+ estimatesPerMember.addFirst(Math.max(0, newestWrite - oldestWrite - uncertaintyMillis));
+
+ // average estimates over up to number of members and remove old value
+ long estimate = 0;
+ int i = 0;
+ for (Iterator<Long> it = estimatesPerMember.iterator(); it.hasNext(); ) {
+ long v = it.next();
+ if (i++ < members.size()) {
+ estimate += v;
+ } else {
+ it.remove();
+ }
+ }
+ lagEstimate = estimate / members.size();
+ LOG.debug("lagEstimate: {} ms ({})", lagEstimate, estimatesPerMember);
+ }
+
+ private static class Heartbeat {
+
+ private final List<ServerAddress> hosts;
+
+ private final Date lastWrite;
+
+ private final long localTime;
+
+ Heartbeat(ServerHeartbeatSucceededEvent event) {
+ this.hosts = hostsFrom(event);
+ this.lastWrite = lastWriteFrom(event);
+ this.localTime = localTimeFrom(event).getTime();
+ }
+
+ Collection<ServerAddress> getHosts() {
+ return hosts;
+ }
+
+ long getTime() {
+ return localTime;
+ }
+
+ @CheckForNull
+ Date getLastWrite() {
+ return lastWrite;
+ }
+
+ }
+
+ private static List<ServerAddress> hostsFrom(ServerHeartbeatSucceededEvent event) {
+ return event.getReply().getArray("hosts", new BsonArray()).stream()
+ .map(bsonValue -> new ServerAddress(bsonValue.asString().getValue()))
+ .collect(Collectors.toList());
+ }
+
+ private static Date localTimeFrom(ServerHeartbeatSucceededEvent event) {
+ BsonDocument reply = event.getReply();
+ return new Date(reply.getDateTime("localTime").getValue());
+ }
+
+ private static Date lastWriteFrom(ServerHeartbeatSucceededEvent event) {
+ BsonDocument reply = event.getReply();
+ if (!reply.containsKey("lastWrite")) {
+ return null;
+ }
+ return new Date(reply.getDocument("lastWrite")
+ .getDateTime("lastWriteDate").getValue());
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java Wed May 23 15:56:20 2018
@@ -42,6 +42,7 @@ import static com.google.common.base.Pre
public class MongoConnection {
private static final int DEFAULT_MAX_WAIT_TIME = (int) TimeUnit.MINUTES.toMillis(1);
+ private static final int DEFAULT_HEARTBEAT_FREQUENCY_MS = (int) TimeUnit.SECONDS.toMillis(5);
private static final WriteConcern WC_UNKNOWN = new WriteConcern("unknown");
private static final Set<ReadConcernLevel> REPLICA_RC = ImmutableSet.of(ReadConcernLevel.MAJORITY, ReadConcernLevel.LINEARIZABLE);
private final MongoClientURI mongoURI;
@@ -154,6 +155,7 @@ public class MongoConnection {
return new MongoClientOptions.Builder()
.description("MongoConnection for Oak DocumentMK")
.maxWaitTime(DEFAULT_MAX_WAIT_TIME)
+ .heartbeatFrequency(DEFAULT_HEARTBEAT_FREQUENCY_MS)
.threadsAllowedToBlockForConnectionMultiplier(100);
}
@@ -164,6 +166,7 @@ public class MongoConnection {
.add("socketTimeout", opts.getSocketTimeout())
.add("socketKeepAlive", opts.isSocketKeepAlive())
.add("maxWaitTime", opts.getMaxWaitTime())
+ .add("heartbeatFrequency", opts.getHeartbeatFrequency())
.add("threadsAllowedToBlockForConnectionMultiplier",
opts.getThreadsAllowedToBlockForConnectionMultiplier())
.add("readPreference", opts.getReadPreference().getName())
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/com/mongodb/OakFongo.java Wed May 23 15:56:20 2018
@@ -17,6 +17,7 @@
package com.mongodb;
import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,10 @@ import com.mongodb.client.model.UpdateOp
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
+import com.mongodb.connection.Cluster;
+import com.mongodb.connection.ClusterConnectionMode;
+import com.mongodb.connection.ClusterDescription;
+import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerVersion;
import com.mongodb.session.ClientSession;
@@ -42,6 +47,7 @@ import org.bson.conversions.Bson;
import static java.util.stream.Collectors.toList;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -99,6 +105,22 @@ public class OakFongo extends Fongo {
for (String dbName : new String[]{MongoUtils.DB, "oak"}) {
when(c.getDatabase(dbName)).thenReturn(new OakFongoMongoDatabase(dbName, this));
}
+ try {
+ Field credentialsList = Mongo.class.getDeclaredField("credentialsList");
+ credentialsList.setAccessible(true);
+ credentialsList.set(c, Collections.emptyList());
+
+ ClusterDescription cd = new ClusterDescription(ClusterConnectionMode.SINGLE,
+ ClusterType.STANDALONE, Collections.emptyList());
+ Cluster cl = mock(Cluster.class);
+ when(cl.getDescription()).thenReturn(cd);
+
+ Field cluster = Mongo.class.getDeclaredField("cluster");
+ cluster.setAccessible(true);
+ cluster.set(c, cl);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
return c;
}
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java Wed May 23 15:56:20 2018
@@ -18,6 +18,8 @@ package org.apache.jackrabbit.oak.plugin
import java.util.Collection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
public abstract class AbstractMultiDocumentStoreTest extends AbstractDocumentStoreTest {
@@ -30,6 +32,20 @@ public abstract class AbstractMultiDocum
this.ds2 = dsf.createDocumentStore(2);
}
+ @BeforeClass
+ public static void disableClientSession() {
+ // Disable the use of client session for this kind of tests.
+ // Most of these tests assume causal consistency across multiple
+ // DocumentStore instances, which is not the case when the test
+ // runs on a replica set and a client session is used.
+ System.setProperty("oak.mongo.clientSession", "false");
+ }
+
+ @AfterClass
+ public static void resetSystemProperty() {
+ System.clearProperty("oak.mongo.clientSession");
+ }
+
@Override
public void cleanUp() throws Exception {
super.cleanUp();
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java Wed May 23 15:56:20 2018
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.util.List;
import com.google.common.collect.Lists;
+import com.mongodb.ReadPreference;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Before;
@@ -127,6 +129,10 @@ public class AbstractTwoNodeTest {
}
private static DocumentStore wrap(DocumentStore ds) {
+ // Enforce primary read preference because this test assumes causal
+ // consistent reads across multiple document stores. Otherwise this
+ // test fails on a replica set with secondary read preference
+ MongoTestUtils.setReadPreference(ds, ReadPreference.primary());
return new DocumentStoreTestWrapper(ds);
}
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java Wed May 23 15:56:20 2018
@@ -25,8 +25,11 @@ import java.util.List;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.mongodb.ReadPreference;
+
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -50,7 +53,7 @@ public class BlobReferenceIteratorTest {
}
@Parameterized.Parameters(name="{0}")
- public static java.util.Collection<Object[]> fixtures() throws IOException {
+ public static java.util.Collection<Object[]> fixtures() {
List<Object[]> fixtures = Lists.newArrayList();
fixtures.add(new Object[] { new DocumentStoreFixture.MemoryFixture() });
@@ -67,11 +70,14 @@ public class BlobReferenceIteratorTest {
}
@Before
- public void setUp() throws InterruptedException {
+ public void setUp() {
store = new DocumentMK.Builder()
.setDocumentStore(fixture.createDocumentStore())
.setAsyncDelay(0)
.getNodeStore();
+ // enforce primary read preference, otherwise test fails on a replica
+ // set with a read preference configured to secondary.
+ MongoTestUtils.setReadPreference(store, ReadPreference.primary());
}
@After
@@ -95,6 +101,6 @@ public class BlobReferenceIteratorTest {
List<ReferencedBlob> collectedBlobs = ImmutableList.copyOf(store.getReferencedBlobsIterator());
assertEquals(blobs.size(), collectedBlobs.size());
- assertEquals(new HashSet<ReferencedBlob>(blobs), new HashSet<ReferencedBlob>(collectedBlobs));
+ assertEquals(new HashSet<>(blobs), new HashSet<>(collectedBlobs));
}
}
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java Wed May 23 15:56:20 2018
@@ -39,6 +39,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
@@ -85,7 +86,7 @@ public class DocumentStoreStatsIT extend
ds.invalidateCache();
ds.find(Collection.NODES, id);
- verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(true), eq(false));
+ verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(true), anyBoolean());
}
@Test
@@ -93,7 +94,7 @@ public class DocumentStoreStatsIT extend
String id = testName.getMethodName();
ds.find(Collection.NODES, id);
- verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(false), eq(false));
+ verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(false), anyBoolean());
}
@Test
@@ -113,7 +114,7 @@ public class DocumentStoreStatsIT extend
eq(false), //indexedProperty
eq(5) , // resultSize
anyLong(), //lockTime
- eq(false) //isSlaveOk
+ anyBoolean() //isSlaveOk
);
}
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Wed May 23 15:56:20 2018
@@ -43,6 +43,8 @@ import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
+import com.mongodb.ReadPreference;
+
import junit.framework.Assert;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.jackrabbit.oak.api.Blob;
@@ -56,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -87,8 +90,20 @@ public class MongoBlobGCTest extends Abs
return setUp(deleteDirect, 10);
}
+ @Override
+ protected DocumentMK.Builder addToBuilder(DocumentMK.Builder mk) {
+ // Disable client session because this test modifies
+ // data directly in MongoDB.
+ return super.addToBuilder(mk)
+ .setClientSessionDisabled(true)
+ .setLeaseCheck(false);
+ }
+
public DataStoreState setUp(boolean deleteDirect, int count) throws Exception {
DocumentNodeStore s = mk.getNodeStore();
+ // ensure primary read preference for this test because we modify data
+ // directly in MongoDB without going through the MongoDocumentStore
+ MongoTestUtils.setReadPreference(s, ReadPreference.primary());
NodeBuilder a = s.getRoot().builder();
int number = count;
@@ -145,7 +160,7 @@ public class MongoBlobGCTest extends Abs
return state;
}
-
+
private class DataStoreState {
Set<String> blobsAdded = Sets.newHashSet();
Set<String> blobsPresent = Sets.newHashSet();
@@ -284,7 +299,10 @@ public class MongoBlobGCTest extends Abs
public void consistencyCheckWithGc() throws Exception {
DataStoreState state = setUp(true);
Set<String> existingAfterGC = gc(0);
- assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
+ assertTrue("blobsAdded: " + state.blobsAdded +
+ ", blobsPresent: " + state.blobsPresent +
+ ", existingAfterGC: " + existingAfterGC,
+ Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCSupportTest.java Wed May 23 15:56:20 2018
@@ -22,9 +22,11 @@ import java.util.List;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.mongodb.ReadPreference;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBVersionGCSupport;
@@ -57,6 +59,12 @@ public class VersionGCSupportTest {
}
if (MONGO.isAvailable()) {
MongoDocumentStore store = (MongoDocumentStore) MONGO.createDocumentStore();
+ // Enforce primary read preference, otherwise tests may fail on a
+ // replica set with a read preference configured to secondary.
+ // Revision GC usually runs with a modified range way in the past,
+ // which means changes made it to the secondary, but not in this
+ // test using a virtual clock
+ MongoTestUtils.setReadPreference(store, ReadPreference.primary());
fixtures.add(new Object[]{MONGO, store, new MongoVersionGCSupport(store)});
}
if (MEMORY.isAvailable()) {
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java?rev=1832110&r1=1832109&r2=1832110&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java Wed May 23 15:56:20 2018
@@ -67,9 +67,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Atomics;
+import com.mongodb.ReadPreference;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -129,6 +131,12 @@ public class VersionGarbageCollectorIT {
documentMKBuilder = new DocumentMK.Builder().clock(clock).setLeaseCheck(false)
.setDocumentStore(fixture.createDocumentStore()).setAsyncDelay(0);
store = documentMKBuilder.getNodeStore();
+ // Enforce primary read preference, otherwise tests may fail on a
+ // replica set with a read preference configured to secondary.
+ // Revision GC usually runs with a modified range way in the past,
+ // which means changes made it to the secondary, but not in this
+ // test using a virtual clock
+ MongoTestUtils.setReadPreference(store, ReadPreference.primary());
gc = store.getVersionGarbageCollector();
}