You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2016/07/05 11:11:13 UTC
svn commit: r1751441 [1/2] - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ ma...
Author: tomekr
Date: Tue Jul 5 11:11:13 2016
New Revision: 1751441
URL: http://svn.apache.org/viewvc?rev=1751441&view=rev
Log:
OAK-3865: New strategy to optimize secondary reads
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoMock.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Tue Jul 5 11:11:13 2016
@@ -499,6 +499,7 @@ public class DocumentMK {
public static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16;
private DocumentNodeStore nodeStore;
private DocumentStore documentStore;
+ private String mongoUri;
private DiffCache diffCache;
private BlobStore blobStore;
private int clusterId = Integer.getInteger("oak.documentMK.clusterId", 0);
@@ -555,6 +556,8 @@ public class DocumentMK {
@Nonnull String name,
int blobCacheSizeMB)
throws UnknownHostException {
+ this.mongoUri = uri;
+
DB db = new MongoConnection(uri).getDB(name);
if (!MongoConnection.hasWriteConcern(uri)) {
db.setWriteConcern(MongoConnection.getDefaultWriteConcern(db));
@@ -602,6 +605,16 @@ public class DocumentMK {
}
/**
+ * Returns the Mongo URI used in the {@link #setMongoDB(String, String, int)} method.
+ *
+ * @return the Mongo URI or null if the {@link #setMongoDB(String, String, int)} method hasn't
+ * been called.
+ */
+ public String getMongoUri() {
+ return mongoUri;
+ }
+
+ /**
* Sets a {@link DataSource} to use for the RDB document and blob
* stores.
*
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue Jul 5 11:11:13 2016
@@ -1042,6 +1042,11 @@ public final class NodeDocument extends
}
}
n.setLastRevision(lastRevision);
+
+ if (store instanceof RevisionListener) {
+ ((RevisionListener) store).updateAccessedRevision(lastRevision);
+ }
+
return n;
}
@@ -1326,10 +1331,17 @@ public final class NodeDocument extends
}
NodeDocument getPreviousDocument(String prevId){
- //Use the maxAge variant such that in case of Mongo call for
- //previous doc are directed towards replicas first
LOG.trace("get previous document {}", prevId);
- return store.find(Collection.NODES, prevId, Integer.MAX_VALUE);
+ NodeDocument doc = store.find(Collection.NODES, prevId);
+ if (doc == null) {
+ // In case secondary read preference is used and node is not found
+ // then check with primary again as it might happen that node document has not been
+ // replicated. We know that document with such an id must exist but possibly dut to
+ // replication lag it has not reached to secondary. So in that case read again
+ // from primary
+ doc = store.find(Collection.NODES, prevId, 0);
+ }
+ return doc;
}
@Nonnull
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * Document stores implementing this interface will be informed about all revisions
+ * in which the nodes are accessed.
+ */
+public interface RevisionListener {
+
+ void updateAccessedRevision(RevisionVector revision);
+
+}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Tue Jul 5 11:11:13 2016
@@ -61,6 +61,8 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.JournalEntry;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
@@ -71,6 +73,8 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
import org.apache.jackrabbit.oak.plugins.document.cache.ModificationStamp;
import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.LocalChanges;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo;
import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks;
import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -111,7 +115,7 @@ import static org.apache.jackrabbit.oak.
/**
* A document store that uses MongoDB as the backend.
*/
-public class MongoDocumentStore implements DocumentStore {
+public class MongoDocumentStore implements DocumentStore, RevisionListener {
private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentStore.class);
private static final PerfLogger PERFLOG = new PerfLogger(
@@ -142,6 +146,12 @@ public class MongoDocumentStore implemen
private Clock clock = Clock.SIMPLE;
+ private ReplicaSetInfo replicaInfo;
+
+ private RevisionVector mostRecentAccessedRevisions;
+
+ final LocalChanges localChanges;
+
private final long maxReplicationLagMillis;
/**
@@ -173,6 +183,24 @@ public class MongoDocumentStore implemen
Long.getLong("oak.mongo.maxQueryTimeMS", TimeUnit.MINUTES.toMillis(1));
/**
+ * How often in milliseconds the MongoDocumentStore should estimate the
+ * replication lag.
+ * <p>
+ * Default is 60'000 (one minute).
+ */
+ private long estimationPullFrequencyMS =
+ Long.getLong("oak.mongo.estimationPullFrequencyMS", TimeUnit.SECONDS.toMillis(5));
+
+ /**
+ * Fallback to the old secondary-routing strategy. Setting this to true
+ * disables the optimisation introduced in the OAK-3865.
+ * <p>
+ * Default is false.
+ */
+ private boolean fallbackSecondaryStrategy =
+ Boolean.getBoolean("oak.mongo.fallbackSecondaryStrategy");
+
+ /**
* The number of documents to put into one bulk update.
* <p>
* Default is 30.
@@ -213,6 +241,18 @@ public class MongoDocumentStore implemen
maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
+ if (fallbackSecondaryStrategy) {
+ replicaInfo = null;
+ localChanges = null;
+ } else {
+ replicaInfo = new ReplicaSetInfo(clock, db, builder.getMongoUri(), estimationPullFrequencyMS, maxReplicationLagMillis, builder.getExecutor());
+ Thread replicaInfoThread = new Thread(replicaInfo, "MongoDocumentStore replica set info provider (" + builder.getClusterId() + ")");
+ replicaInfoThread.setDaemon(true);
+ replicaInfoThread.start();
+ localChanges = new LocalChanges(builder.getClusterId());
+ replicaInfo.addListener(localChanges);
+ }
+
// indexes:
// the _id field is the primary key, so we don't need to define it
@@ -473,7 +513,7 @@ public class MongoDocumentStore implemen
boolean isSlaveOk = false;
boolean docFound = true;
try {
- ReadPreference readPreference = getMongoReadPreference(collection, Utils.getParentId(key), docReadPref);
+ ReadPreference readPreference = getMongoReadPreference(collection, null, key, docReadPref);
if(readPreference.isSlaveOk()){
LOG.trace("Routing call to secondary for fetching [{}]", key);
@@ -482,17 +522,6 @@ public class MongoDocumentStore implemen
DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, readPreference);
- if (obj == null
- && readPreference.isSlaveOk()) {
- //In case secondary read preference is used and node is not found
- //then check with primary again as it might happen that node document has not been
- //replicated. This is required for case like SplitDocument where the SplitDoc is fetched with
- //maxCacheAge == Integer.MAX_VALUE which results in readPreference of secondary.
- //In such a case we know that document with such an id must exist
- //but possibly dut to replication lag it has not reached to secondary. So in that case read again
- //from primary
- obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, ReadPreference.primary());
- }
if(obj == null){
docFound = false;
return null;
@@ -585,7 +614,7 @@ public class MongoDocumentStore implemen
cursor.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
}
ReadPreference readPreference =
- getMongoReadPreference(collection, parentId, getDefaultReadPreference(collection));
+ getMongoReadPreference(collection, parentId, null, getDefaultReadPreference(collection));
if(readPreference.isSlaveOk()){
isSlaveOk = true;
@@ -772,6 +801,7 @@ public class MongoDocumentStore implemen
if (collection == Collection.NODES) {
NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, updateOp);
nodesCache.put(newDoc);
+ updateLocalChanges(newDoc);
}
oldDoc.seal();
} else if (upsert) {
@@ -779,6 +809,7 @@ public class MongoDocumentStore implemen
NodeDocument doc = (NodeDocument) collection.newDocument(this);
UpdateUtils.applyChanges(doc, updateOp);
nodesCache.putIfAbsent(doc);
+ updateLocalChanges(doc);
}
} else {
// updateOp without conditions and not an upsert
@@ -941,6 +972,11 @@ public class MongoDocumentStore implemen
docsToCache.add(newDoc);
}
}
+
+ for (NodeDocument doc : docsToCache) {
+ updateLocalChanges(doc);
+ }
+
nodesCache.putNonConflictingDocs(tracker, docsToCache);
}
oldDocs.keySet().removeAll(bulkResult.failedUpdates);
@@ -1106,6 +1142,7 @@ public class MongoDocumentStore implemen
if (collection == Collection.NODES) {
for (T doc : docs) {
nodesCache.putIfAbsent((NodeDocument) doc);
+ updateLocalChanges((NodeDocument) doc);
}
}
insertSuccess = true;
@@ -1220,7 +1257,8 @@ public class MongoDocumentStore implemen
}
DocumentReadPreference getReadPreference(int maxCacheAge){
- if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) {
+ long lag = fallbackSecondaryStrategy ? maxReplicationLagMillis : replicaInfo.getLag();
+ if(maxCacheAge >= 0 && maxCacheAge < lag) {
return DocumentReadPreference.PRIMARY;
} else if(maxCacheAge == Integer.MAX_VALUE){
return DocumentReadPreference.PREFER_SECONDARY;
@@ -1233,9 +1271,10 @@ public class MongoDocumentStore implemen
return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : DocumentReadPreference.PRIMARY;
}
- <T extends Document> ReadPreference getMongoReadPreference(Collection<T> collection,
- String parentId,
- DocumentReadPreference preference) {
+ <T extends Document> ReadPreference getMongoReadPreference(@Nonnull Collection<T> collection,
+ @Nullable String parentId,
+ @Nullable String documentId,
+ @Nonnull DocumentReadPreference preference) {
switch(preference){
case PRIMARY:
return ReadPreference.primary();
@@ -1248,23 +1287,37 @@ public class MongoDocumentStore implemen
return ReadPreference.primary();
}
- // read from primary unless parent has not been modified
- // within replication lag period
- ReadPreference readPreference = ReadPreference.primary();
- if (parentId != null) {
- long replicationSafeLimit = getTime() - maxReplicationLagMillis;
- NodeDocument cachedDoc = nodesCache.getIfPresent(parentId);
- // FIXME: this is not quite accurate, because ancestors
+ boolean secondarySafe;
+ if (fallbackSecondaryStrategy) {
+ // This is not quite accurate, because ancestors
// are updated in a background thread (_lastRev). We
// will need to revise this for low maxReplicationLagMillis
// values
- if (cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit)) {
+ long replicationSafeLimit = getTime() - maxReplicationLagMillis;
+ if (parentId == null) {
+ secondarySafe = false;
+ } else {
//If parent has been modified loooong time back then there children
//would also have not be modified. In that case we can read from secondary
- readPreference = getConfiguredReadPreference(collection);
+ NodeDocument cachedDoc = nodesCache.getIfPresent(parentId);
+ secondarySafe = cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit);
}
+ } else {
+ secondarySafe = true;
+ secondarySafe &= collection == Collection.NODES;
+ secondarySafe &= documentId == null || !localChanges.mayContain(documentId);
+ secondarySafe &= parentId == null || !localChanges.mayContainChildrenOf(parentId);
+ secondarySafe &= mostRecentAccessedRevisions == null || replicaInfo.isMoreRecentThan(mostRecentAccessedRevisions);
+ }
+
+ ReadPreference readPreference;
+ if (secondarySafe) {
+ readPreference = getConfiguredReadPreference(collection);
+ } else {
+ readPreference = ReadPreference.primary();
}
+
return readPreference;
default:
throw new IllegalArgumentException("Unsupported usage " + preference);
@@ -1339,6 +1392,9 @@ public class MongoDocumentStore implemen
@Override
public void dispose() {
+ if (replicaInfo != null) {
+ replicaInfo.stop();
+ }
nodes.getDB().getMongo().close();
try {
nodesCache.close();
@@ -1534,6 +1590,14 @@ public class MongoDocumentStore implemen
this.stats = stats;
}
+ void setReplicaInfo(ReplicaSetInfo replicaInfo) {
+ if (this.replicaInfo != null) {
+ this.replicaInfo.stop();
+ }
+ this.replicaInfo = replicaInfo;
+ this.replicaInfo.addListener(localChanges);
+ }
+
@Override
public long determineServerTimeDifferenceMillis() {
// the assumption is that the network delay from this instance
@@ -1580,6 +1644,25 @@ public class MongoDocumentStore implemen
return diff;
}
+ @Override
+ public synchronized void updateAccessedRevision(RevisionVector revisions) {
+ RevisionVector previousValue = mostRecentAccessedRevisions;
+ if (mostRecentAccessedRevisions == null) {
+ mostRecentAccessedRevisions = revisions;
+ } else {
+ mostRecentAccessedRevisions = mostRecentAccessedRevisions.pmax(revisions);
+ }
+ if (LOG.isDebugEnabled() && !mostRecentAccessedRevisions.equals(previousValue)) {
+ LOG.debug("Most recent accessed revisions: {}", mostRecentAccessedRevisions);
+ }
+ }
+
+ private void updateLocalChanges(NodeDocument doc) {
+ if (localChanges != null) {
+ localChanges.add(doc.getId(), Revision.getCurrentTimestamp());
+ }
+ }
+
private static class BulkUpdateResult {
private final Set<String> failedUpdates;
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+public class GetRootRevisionsCallable implements Callable<Timestamped<RevisionVector>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GetRootRevisionsCallable.class);
+
+ private final String hostName;
+
+ private final NodeCollectionProvider nodeCollections;
+
+ private final Clock clock;
+
+ public GetRootRevisionsCallable(Clock clock, String hostName, NodeCollectionProvider nodeCollections) {
+ this.hostName = hostName;
+ this.nodeCollections = nodeCollections;
+ this.clock = clock;
+ }
+
+ @Override
+ public Timestamped<RevisionVector> call() throws Exception {
+ List<Revision> revisions = new ArrayList<Revision>();
+ DBCollection collection = nodeCollections.get(hostName);
+
+ long start = clock.getTime();
+ DBObject root = collection.findOne(new BasicDBObject(Document.ID, "0:/"));
+ long end = clock.getTime();
+ long mid = (start + end) / 2;
+
+ if (root == null) {
+ LOG.warn("Can't get the root document on {}", hostName);
+ return null;
+ }
+
+ DBObject lastRev = (DBObject) root.get("_lastRev");
+ for (String clusterId : lastRev.keySet()) {
+ String rev = (String) lastRev.get(clusterId);
+ revisions.add(Revision.fromString(rev));
+ }
+ LOG.debug("Got /_lastRev from {}: {}", hostName, lastRev);
+ return new Timestamped<RevisionVector>(new RevisionVector(revisions), mid);
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class maintains a list of local changes (path+local revision timestamp), which
+ * shouldn't be read from the secondary Mongo, as we are not sure if they have
+ * been already replicated from primary. Once we get this confidence, the entry
+ * will be removed from the map.
+ */
+public class LocalChanges implements ReplicaSetInfoListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalChanges.class);
+
+ /**
+ * How many paths should be stored in the {@link #localChanges} map. If
+ * there's more paths added (and not removed in the
+ * {@link #gotRootRevisions(RevisionVector)}), only the latest changed
+ * revision will be remembered.
+ */
+ private static final int SIZE_LIMIT = 100;
+
+ /**
+ * This map contains document paths and timestamps in which they have been
+ * changed. Paths in this collection hasn't been replicated to secondary
+ * instances yet.
+ */
+ final Map<String, Long> localChanges = new HashMap<String, Long>();
+
+ /**
+ * If there's more than {@link #SIZE_LIMIT} paths in the
+ * {@link #localChanges}, the class will clear the above map and update this
+ * variable with the timestamp of the last local change. {@code true} will be returned
+ * for all {@link #mayContainChildrenOf(String)} and {@link #mayContain(String)}
+ * invocations until this timestamp is replicated to all secondary instances.
+ * <p>
+ * This is a safety mechanism, so the {@link #localChanges} won't grow too much.
+ */
+ private volatile long latestChange;
+
+ /**
+ * True if the current Mongo installation is an working replica. Otherwise
+ * there's no need to store the local changes.
+ */
+ private volatile boolean replicaActive;
+
+ private volatile long rootTS;
+
+ private final int clusterId;
+
+ public LocalChanges(int clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public void add(String id, long timestamp) {
+ long localRootTS = rootTS;
+ if (localRootTS != 0 && localRootTS > timestamp) {
+ return;
+ }
+
+ synchronized (this) {
+ if (latestChange != 0 && latestChange > timestamp) {
+ return;
+ }
+
+ if (replicaActive) {
+ localChanges.put(id, timestamp);
+ if (localChanges.size() >= SIZE_LIMIT) {
+ localChanges.clear();
+ latestChange = timestamp;
+ LOG.debug(
+ "The local changes count == {}. Clearing the list and switching to the 'latest change' mode: {}",
+ SIZE_LIMIT, latestChange);
+ }
+ } else {
+ latestChange = timestamp;
+ }
+ }
+ }
+
+ /**
+ * Check if it's possible that the given document hasn't been replicated to
+ * the secondary yet.
+ *
+ * @param documentId
+ * @return {@code true} if it's possible that the document is still in the
+ * Mongo replication queue
+ */
+ public boolean mayContain(String documentId) {
+ if (!replicaActive || latestChange != 0) {
+ return true;
+ }
+
+ synchronized (this) {
+ return localChanges.containsKey(documentId);
+ }
+ }
+
+ /**
+ * Check if it's possible that the children of the given document hasn't
+ * been replicated to the secondary yet.
+ *
+ * @param parentId
+ * @return {@code true} if it's possible that the children of given document
+ * are still in the Mongo replication queue
+ */
+ public boolean mayContainChildrenOf(String parentId) {
+ if (!replicaActive || latestChange != 0) {
+ return true;
+ }
+
+ synchronized (this) {
+ for (String key : localChanges.keySet()) {
+ if (parentId.equals(Utils.getParentId(key))) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public void gotRootRevisions(RevisionVector rootRevision) {
+ if (rootRevision == null) {
+ return;
+ }
+
+ Revision rootRevForLocalInstance = rootRevision.getRevision(clusterId);
+ if (rootRevForLocalInstance == null) {
+ return;
+ }
+ this.rootTS = rootRevForLocalInstance.getTimestamp();
+
+ if (!replicaActive) {
+ replicaActive = true;
+ LOG.info("Replica set became active");
+ }
+
+ synchronized (this) {
+ if (latestChange != 0 && latestChange <= rootTS) {
+ latestChange = 0;
+ }
+
+ Iterator<Long> it = localChanges.values().iterator();
+ while (it.hasNext()) {
+ if (it.next() <= rootTS) {
+ it.remove();
+ }
+ }
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static com.google.common.collect.Sets.difference;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientException;
+import com.mongodb.MongoClientURI;
+
+/**
+ * This class connects to Mongo instances and returns the NODES collection.
+ * Connections are cached.
+ */
+public class NodeCollectionProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodeCollectionProvider.class);
+
+ private final Map<String, DBCollection> collections = new ConcurrentHashMap<String, DBCollection>();
+
+ private final String originalMongoUri;
+
+ private final String dbName;
+
+ public NodeCollectionProvider(String originalMongoUri, String dbName) {
+ this.originalMongoUri = originalMongoUri;
+ this.dbName = dbName;
+ }
+
+ public void retain(Set<String> hostNames) {
+ close(difference(collections.keySet(), hostNames));
+ }
+
+ public void close() {
+ close(collections.keySet());
+ }
+
+ private void close(Set<String> hostNames) {
+ Iterator<Entry<String, DBCollection>> it = collections.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, DBCollection> entry = it.next();
+ if (hostNames.contains(entry.getKey())) {
+ try {
+ entry.getValue().getDB().getMongo().close();
+ it.remove();
+ } catch (MongoClientException e) {
+ LOG.error("Can't close Mongo client", e);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ public DBCollection get(String hostname) throws UnknownHostException {
+ if (collections.containsKey(hostname)) {
+ return collections.get(hostname);
+ }
+
+ MongoClient client;
+ if (originalMongoUri == null) {
+ MongoClientURI uri = new MongoClientURI("mongodb://" + hostname);
+ client = new MongoClient(uri);
+ } else {
+ client = prepareClientForHostname(hostname);
+ }
+
+ DB db = client.getDB(dbName);
+ db.getMongo().slaveOk();
+ DBCollection collection = db.getCollection(Collection.NODES.toString());
+ collections.put(hostname, collection);
+ return collection;
+ }
+
+ private MongoClient prepareClientForHostname(String hostname) throws UnknownHostException {
+ ServerAddress address;
+ if (hostname.contains(":")) {
+ String[] hostSplit = hostname.split(":");
+ if (hostSplit.length != 2) {
+ throw new IllegalArgumentException("Not a valid hostname: " + hostname);
+ }
+ address = new ServerAddress(hostSplit[0], Integer.parseInt(hostSplit[1]));
+ } else {
+ address = new ServerAddress(hostname);
+ }
+
+ MongoClientURI originalUri = new MongoClientURI(originalMongoUri);
+ List<MongoCredential> credentialList = new ArrayList<MongoCredential>(1);
+ if (originalUri.getCredentials() != null) {
+ credentialList.add(originalUri.getCredentials());
+ }
+ return new MongoClient(address, credentialList, originalUri.getOptions());
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static com.google.common.base.Predicates.in;
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Iterables.isEmpty;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.filterKeys;
+import static com.google.common.collect.Sets.union;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isGreaterOrEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
+
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.bson.BasicBSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.MongoException;
+import com.mongodb.ReadPreference;
+
+/**
+ * This class analyses the replica set info provided by MongoDB to find out two
+ * what's the current synchronization state of secondary instances in terms of
+ * revision values and timestamp.
+ */
+public class ReplicaSetInfo implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetInfo.class);
+
+ private final DB adminDb;
+
+ private final long pullFrequencyMillis;
+
+ private final long maxReplicationLagMillis;
+
+ private final Executor executor;
+
+ private final NodeCollectionProvider nodeCollections;
+
+ private final Clock clock;
+
+ private final Object stopMonitor = new Object();
+
+ protected final List<ReplicaSetInfoListener> listeners = new CopyOnWriteArrayList<ReplicaSetInfoListener>();
+
+ protected volatile RevisionVector rootRevisions;
+
+ volatile long secondariesSafeTimestamp;
+
+ List<String> hiddenMembers;
+
+ private volatile boolean stop;
+
+ public ReplicaSetInfo(Clock clock, DB db, String originalMongoUri, long pullFrequencyMillis, long maxReplicationLagMillis, Executor executor) {
+ this.executor = executor;
+ this.clock = clock;
+ this.adminDb = db.getSisterDB("admin");
+ this.pullFrequencyMillis = pullFrequencyMillis;
+ this.maxReplicationLagMillis = maxReplicationLagMillis;
+ this.nodeCollections = new NodeCollectionProvider(originalMongoUri, db.getName());
+ }
+
+ public void addListener(ReplicaSetInfoListener listener) {
+ listeners.add(listener);
+ }
+
+ public boolean isMoreRecentThan(RevisionVector revisions) {
+ RevisionVector localRootRevisions = rootRevisions;
+ if (localRootRevisions == null) {
+ return false;
+ } else {
+ return isGreaterOrEquals(localRootRevisions, revisions);
+ }
+ }
+
+ public long getLag() {
+ long localTS = secondariesSafeTimestamp;
+ if (localTS == 0) {
+ return maxReplicationLagMillis;
+ } else {
+ return clock.getTime() - localTS;
+ }
+ }
+
+ @Nullable
+ public RevisionVector getMinimumRootRevisions() {
+ return rootRevisions;
+ }
+
+ public void stop() {
+ synchronized (stopMonitor) {
+ stop = true;
+ stopMonitor.notify();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ updateLoop();
+ } catch (Exception e) {
+ LOG.error("Exception in the ReplicaSetInfo thread", e);
+ }
+ }
+
+ private void updateLoop() {
+ while (!stop) {
+ if (hiddenMembers == null) {
+ hiddenMembers = getHiddenMembers();
+ } else {
+ updateReplicaStatus();
+
+ for (ReplicaSetInfoListener listener : listeners) {
+ listener.gotRootRevisions(rootRevisions);
+ }
+ }
+
+ synchronized (stopMonitor) {
+ try {
+ if (!stop) {
+ stopMonitor.wait(pullFrequencyMillis);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ LOG.debug("Stopping the replica set info");
+ nodeCollections.close();
+ }
+
+ void updateReplicaStatus() {
+ BasicDBObject result;
+ try {
+ result = getReplicaStatus();
+ } catch (MongoException e) {
+ LOG.error("Can't get replica status", e);
+ rootRevisions = null;
+ secondariesSafeTimestamp = 0;
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ Iterable<BasicBSONObject> members = (Iterable<BasicBSONObject>) result.get("members");
+ if (members == null) {
+ members = Collections.emptyList();
+ }
+ updateRevisions(members);
+ }
+
+ List<String> getHiddenMembers() {
+ BasicDBObject result;
+ try {
+ result = getReplicaConfig();
+ } catch (MongoException e) {
+ LOG.error("Can't get replica configuration", e);
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ Iterable<BasicBSONObject> members = (Iterable<BasicBSONObject>) result.get("members");
+ if (members == null) {
+ members = Collections.emptyList();
+ }
+
+ List<String> hiddenMembers = new ArrayList<String>();
+ for (BasicBSONObject member : members) {
+ if (member.getBoolean("hidden")) {
+ hiddenMembers.add(member.getString("host"));
+ }
+ }
+ return hiddenMembers;
+ }
+
+ protected BasicDBObject getReplicaConfig() {
+ return adminDb.command("replSetGetConfig", ReadPreference.primary());
+ }
+
+ protected BasicDBObject getReplicaStatus() {
+ return adminDb.command("replSetGetStatus", ReadPreference.primary());
+ }
+
+ private void updateRevisions(Iterable<BasicBSONObject> members) {
+ Set<String> secondaries = new HashSet<String>();
+ boolean unknownState = false;
+ String primary = null;
+
+ for (BasicBSONObject member : members) {
+ MemberState state;
+ try {
+ state = MemberState.valueOf(member.getString("stateStr"));
+ } catch (IllegalArgumentException e) {
+ state = MemberState.UNKNOWN;
+ }
+ String name = member.getString("name");
+ if (hiddenMembers.contains(name)) {
+ continue;
+ }
+
+ switch (state) {
+ case PRIMARY:
+ primary = name;
+ continue;
+
+ case SECONDARY:
+ secondaries.add(name);
+ break;
+
+ case ARBITER:
+ continue;
+
+ default:
+ LOG.debug("Invalid state {} for instance {}", state, name);
+ unknownState = true;
+ break;
+ }
+ }
+
+ if (secondaries.isEmpty()) {
+ LOG.debug("No secondaries found: {}", members);
+ unknownState = true;
+ }
+
+ if (primary == null) {
+ LOG.debug("No primary found: {}", members);
+ unknownState = true;
+ }
+
+ Map<String, Timestamped<RevisionVector>> vectors = null;
+ if (!unknownState) {
+ vectors = getRootRevisions(union(secondaries, of(primary)));
+ if (vectors.containsValue(null)) {
+ unknownState = true;
+ }
+ }
+
+ if (unknownState) {
+ rootRevisions = null;
+ secondariesSafeTimestamp = 0;
+ } else {
+ Timestamped<RevisionVector> primaryRevision = vectors.get(primary);
+ Iterable<Timestamped<RevisionVector>> secondaryRevisions = filterKeys(vectors, in(secondaries)).values();
+
+ rootRevisions = pmin(transform(secondaryRevisions, Timestamped.<RevisionVector>getExtractFunction()));
+ if (rootRevisions == null || primaryRevision == null || isEmpty(secondaryRevisions)) {
+ secondariesSafeTimestamp = 0;
+ } else {
+ secondariesSafeTimestamp = getSecondariesSafeTimestamp(primaryRevision, secondaryRevisions);
+ }
+ }
+
+ LOG.debug("Minimum root revisions: {}. Current lag: {}", rootRevisions, getLag());
+ nodeCollections.retain(secondaries);
+ }
+
+ /**
+ * Find the oldest revision which hasn't been replicated from primary to
+ * secondary yet and return its timestamp. If all revisions has been already
+ * replicated, return the date of the measurement.
+ *
+ * @return the point in time to which the secondary instances has been synchronized
+ */
+ private long getSecondariesSafeTimestamp(Timestamped<RevisionVector> primary, Iterable<Timestamped<RevisionVector>> secondaries) {
+ final RevisionVector priRev = primary.getValue();
+ Long oldestNotReplicated = null;
+ for (Timestamped<RevisionVector> v : secondaries) {
+ RevisionVector secRev = v.getValue();
+ if (secRev.equals(priRev)) {
+ continue;
+ }
+
+ for (Revision pr : priRev) {
+ Revision sr = secRev.getRevision(pr.getClusterId());
+ if (pr.equals(sr)) {
+ continue;
+ }
+ if (oldestNotReplicated == null || oldestNotReplicated > pr.getTimestamp()) {
+ oldestNotReplicated = pr.getTimestamp();
+ }
+ }
+ }
+
+ if (oldestNotReplicated == null) {
+ long minOpTimestamp = primary.getOperationTimestamp();
+ for (Timestamped<RevisionVector> v : secondaries) {
+ if (v.getOperationTimestamp() < minOpTimestamp) {
+ minOpTimestamp = v.getOperationTimestamp();
+ }
+ }
+ return minOpTimestamp;
+ } else {
+ return oldestNotReplicated;
+ }
+ }
+
+ protected Map<String, Timestamped<RevisionVector>> getRootRevisions(Iterable<String> hosts) {
+ Map<String, Future<Timestamped<RevisionVector>>> futures = new HashMap<String, Future<Timestamped<RevisionVector>>>();
+ for (final String hostName : hosts) {
+ Callable<Timestamped<RevisionVector>> callable = new GetRootRevisionsCallable(clock, hostName, nodeCollections);
+ FutureTask<Timestamped<RevisionVector>> futureTask = new FutureTask<Timestamped<RevisionVector>>(callable);
+ futures.put(hostName, futureTask);
+ executor.execute(futureTask);
+ }
+
+ Map<String, Timestamped<RevisionVector>> result = new HashMap<String, Timestamped<RevisionVector>>();
+ for (Entry<String, Future<Timestamped<RevisionVector>>> entry : futures.entrySet()) {
+ try {
+ result.put(entry.getKey(), entry.getValue().get());
+ } catch (Exception e) {
+ LOG.error("Can't connect to the Mongo instance", e);
+ }
+ }
+ return result;
+ }
+
+ private static RevisionVector pmin(Iterable<RevisionVector> vectors) {
+ RevisionVector minimum = null;
+ for (RevisionVector v : vectors) {
+ if (v == null) {
+ return null;
+ } else if (minimum == null) {
+ minimum = v;
+ } else {
+ minimum = minimum.pmin(v);
+ }
+ }
+ return minimum;
+ }
+
+ public enum MemberState {
+ STARTUP, PRIMARY, SECONDARY, RECOVERING, STARTUP2, UNKNOWN, ARBITER, DOWN, ROLLBACK, REMOVED
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+
+/**
+ * Classes implementing this interface will be informed about the current root
+ * revision states on secondary instances by {@link ReplicaSetInfo}.
+ */
+public interface ReplicaSetInfoListener {
+
+ void gotRootRevisions(RevisionVector rootRevision);
+
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import com.google.common.base.Function;
+
+/**
+ * A value with a timestamp.
+ *
+ * @param <T> the value type
+ */
+public class Timestamped<T> {
+
+ private final T value;
+
+ private final long operationTimestamp;
+
+ public Timestamped(T value, long operationTimestamp) {
+ this.value = value;
+ this.operationTimestamp = operationTimestamp;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public long getOperationTimestamp() {
+ return operationTimestamp;
+ }
+
+ public static <T> Function<Timestamped<T>, T> getExtractFunction() {
+ return new Function<Timestamped<T>, T>() {
+ @Override
+ public T apply(Timestamped<T> input) {
+ if (input == null) {
+ return null;
+ } else {
+ return input.value;
+ }
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder("Timestamped[").append(value).append('(').append(operationTimestamp).append(")]")
+ .toString();
+ }
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java Tue Jul 5 11:11:13 2016
@@ -26,6 +26,8 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
@@ -37,7 +39,7 @@ import org.apache.jackrabbit.oak.plugins
* <p>
* @see "https://issues.apache.org/jira/browse/OAK-2739 for more details"
*/
-public final class LeaseCheckDocumentStoreWrapper implements DocumentStore {
+public final class LeaseCheckDocumentStoreWrapper implements DocumentStore, RevisionListener {
private final DocumentStore delegate;
private final ClusterNodeInfo clusterNodeInfo;
@@ -203,4 +205,11 @@ public final class LeaseCheckDocumentSto
return delegate.determineServerTimeDifferenceMillis();
}
+ @Override
+ public void updateAccessedRevision(RevisionVector revision) {
+ if (delegate instanceof RevisionListener) {
+ ((RevisionListener) delegate).updateAccessedRevision(revision);
+ }
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Tue Jul 5 11:11:13 2016
@@ -28,6 +28,8 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
import org.slf4j.Logger;
@@ -36,7 +38,7 @@ import org.slf4j.LoggerFactory;
/**
* Implements a <code>DocumentStore</code> wrapper and logs all calls.
*/
-public class LoggingDocumentStoreWrapper implements DocumentStore {
+public class LoggingDocumentStoreWrapper implements DocumentStore, RevisionListener {
private static final Logger LOG = LoggerFactory.getLogger(LoggingDocumentStoreWrapper.class);
@@ -408,4 +410,12 @@ public class LoggingDocumentStoreWrapper
}
LOG.info(out);
}
+
+ @Override
+ public void updateAccessedRevision(RevisionVector revision) {
+ logMethod("updateAccessedRevision", revision);
+ if (store instanceof RevisionListener) {
+ ((RevisionListener) store).updateAccessedRevision(revision);
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Tue Jul 5 11:11:13 2016
@@ -25,6 +25,8 @@ import org.apache.jackrabbit.oak.cache.C
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
@@ -32,7 +34,7 @@ import org.apache.jackrabbit.oak.plugins
* Implements a <code>DocumentStore</code> wrapper which synchronizes on all
* methods.
*/
-public class SynchronizingDocumentStoreWrapper implements DocumentStore {
+public class SynchronizingDocumentStoreWrapper implements DocumentStore, RevisionListener {
final DocumentStore store;
@@ -150,4 +152,11 @@ public class SynchronizingDocumentStoreW
public synchronized Map<String, String> getMetadata() {
return store.getMetadata();
}
+
+ @Override
+ public synchronized void updateAccessedRevision(RevisionVector revision) {
+ if (store instanceof RevisionListener) {
+ ((RevisionListener) store).updateAccessedRevision(revision);
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Tue Jul 5 11:11:13 2016
@@ -34,6 +34,8 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.RevisionListener;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
@@ -41,7 +43,7 @@ import org.apache.jackrabbit.oak.plugins
* A DocumentStore wrapper that can be used to log and also time DocumentStore
* calls.
*/
-public class TimingDocumentStoreWrapper implements DocumentStore {
+public class TimingDocumentStoreWrapper implements DocumentStore, RevisionListener {
private static final boolean DEBUG = Boolean.parseBoolean(System.getProperty("base.debug", "true"));
private static final AtomicInteger NEXT_ID = new AtomicInteger();
@@ -390,6 +392,19 @@ public class TimingDocumentStoreWrapper
}
}
+ @Override
+ public void updateAccessedRevision(RevisionVector revision) {
+ try {
+ long start = now();
+ if (base instanceof RevisionListener) {
+ ((RevisionListener) base).updateAccessedRevision(revision);
+ }
+ updateAndLogTimes("updateAccessedRevision", start, 0, 0);
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
private void logCommonCall(long start, String key) {
int time = (int) (System.currentTimeMillis() - start);
if (time <= 0) {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Tue Jul 5 11:11:13 2016
@@ -759,6 +759,21 @@ public class Utils {
}
/**
+ * Returns true if all the revisions in the {@code a} greater or equals
+ * to their counterparts in {@code b}. If {@code b} contains revisions
+ * for cluster nodes that are not present in {@code a}, return false.
+ *
+ * @param a
+ * @param b
+ * @return true if all the revisions in the {@code a} are at least
+ * as recent as their counterparts in the {@code b}
+ */
+ public static boolean isGreaterOrEquals(@Nonnull RevisionVector a,
+ @Nonnull RevisionVector b) {
+ return a.pmax(b).equals(a);
+ }
+
+ /**
* Wraps the given iterable and aborts iteration over elements when the
* predicate on an element evaluates to {@code false}.
*
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Tue Jul 5 11:11:13 2016
@@ -27,7 +27,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
-public class CountingDocumentStore implements DocumentStore {
+public class CountingDocumentStore implements DocumentStore, RevisionListener {
private DocumentStore delegate;
@@ -230,4 +230,11 @@ public class CountingDocumentStore imple
public long determineServerTimeDifferenceMillis() {
return delegate.determineServerTimeDifferenceMillis();
}
+
+ @Override
+ public void updateAccessedRevision(RevisionVector revision) {
+ if (delegate instanceof RevisionListener) {
+ ((RevisionListener) delegate).updateAccessedRevision(revision);
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java Tue Jul 5 11:11:13 2016
@@ -28,7 +28,7 @@ import org.apache.jackrabbit.oak.plugins
* A DocumentStore implementation which wraps another store and delegates all
* calls to it.
*/
-public class DocumentStoreWrapper implements DocumentStore {
+public class DocumentStoreWrapper implements DocumentStore, RevisionListener {
protected final DocumentStore store;
@@ -164,4 +164,11 @@ public class DocumentStoreWrapper implem
public long determineServerTimeDifferenceMillis() {
return store.determineServerTimeDifferenceMillis();
}
+
+ @Override
+ public void updateAccessedRevision(RevisionVector revision) {
+ if (store instanceof RevisionListener) {
+ ((RevisionListener) store).updateAccessedRevision(revision);
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java?rev=1751441&r1=1751440&r2=1751441&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java Tue Jul 5 11:11:13 2016
@@ -19,19 +19,17 @@
package org.apache.jackrabbit.oak.plugins.document.mongo;
-import java.util.concurrent.TimeUnit;
-
import com.mongodb.ReadPreference;
-import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
-import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
-import org.apache.jackrabbit.oak.plugins.document.Revision;
-import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.plugins.document.*;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfoMock;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
import org.junit.Test;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
@@ -44,30 +42,56 @@ public class ReadPreferenceIT extends Ab
private MongoDocumentStore mongoDS;
+ private DocumentMK mk2;
+
private Clock clock;
- private long replicationLag;
+ private ReplicaSetInfoMock replica;
+
+ private ReplicaSetInfoMock.RevisionBuilder primary;
+
+ private ReplicaSetInfoMock.RevisionBuilder secondary;
@Override
public void setUpConnection() throws Exception {
clock = new Clock.Virtual();
- replicationLag = TimeUnit.SECONDS.toMillis(10);
mongoConnection = connectionFactory.getConnection();
+ replica = ReplicaSetInfoMock.create(clock);
mk = new DocumentMK.Builder()
- .setMaxReplicationLag(replicationLag, TimeUnit.MILLISECONDS)
- .setMongoDB(mongoConnection.getDB())
.setClusterId(1)
+ .setMongoDB(mongoConnection.getDB())
.setLeaseCheck(false)
.open();
mongoDS = (MongoDocumentStore) mk.getDocumentStore();
+
+ mk2 = new DocumentMK.Builder()
+ .setClusterId(2)
+ .setMongoDB(mongoConnection.getDB())
+ .setLeaseCheck(false)
+ .open();
+ }
+
+ @Before
+ public void createReplicaSet() {
+ replica = ReplicaSetInfoMock.create(clock);
+
+ primary = replica.addInstance(ReplicaSetInfo.MemberState.PRIMARY, "p1");
+ secondary = replica.addInstance(ReplicaSetInfo.MemberState.SECONDARY, "s1");
+ mongoDS.setReplicaInfo(replica);
}
@Test
public void testPreferenceConversion() throws Exception{
+ primary.addRevisions(200);
+ secondary.addRevisions(0);
+ replica.updateRevisions();
+ clock.waitUntil(500);
+ assertEquals(300, replica.getLag());
+
//For cacheAge < replicationLag result should be primary
assertEquals(DocumentReadPreference.PRIMARY, mongoDS.getReadPreference(0));
assertEquals(DocumentReadPreference.PRIMARY,
- mongoDS.getReadPreference((int) (replicationLag - 100)));
+ mongoDS.getReadPreference((int) (replica.getLag() - 100)));
//For Integer.MAX_VALUE it should be secondary as caller intends that value is stable
assertEquals(DocumentReadPreference.PREFER_SECONDARY,
@@ -77,38 +101,75 @@ public class ReadPreferenceIT extends Ab
assertEquals(DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH,
mongoDS.getReadPreference(-1));
assertEquals(DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH,
- mongoDS.getReadPreference((int) (replicationLag + 100)));
+ mongoDS.getReadPreference((int) (replica.getLag() + 100)));
}
@Test
public void testMongoReadPreferencesDefault() throws Exception{
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PRIMARY));
assertEquals(ReadPreference.primaryPreferred(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_PRIMARY));
//By default Mongo read preference is primary
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY));
//Change the default and assert again
mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary());
assertEquals(ReadPreference.secondary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY));
//for case where parent age cannot be determined the preference should be primary
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
//For collection other than NODES always primary
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(SETTINGS,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
}
@Test
- public void testMongoReadPreferencesWithAge() throws Exception{
+ public void testMongoReadPreferences() throws Exception {
+ ReadPreference testPref = ReadPreference.secondary();
+ mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref);
+
+ NodeStore extNodeStore = mk2.getNodeStore();
+ NodeBuilder b1 = extNodeStore.getRoot().builder();
+ b1.child("x").child("y").setProperty("xyz", "123");
+ extNodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ // wait until the change is visible
+ NodeStore nodeStore = mk.getNodeStore();
+ while (true) {
+ if (nodeStore.getRoot().hasChildNode("x")) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
+ // the change hasn't been replicated yet, primary must be used
+ assertEquals(ReadPreference.primary(),
+ mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+
+ // make the secondary up-to-date
+ DocumentNodeState ns = (DocumentNodeState) nodeStore.getRoot().getChildNode("x").getChildNode("y");
+ RevisionVector lastSeenRev = ns.getLastRevision().update(new Revision(Revision.getCurrentTimestamp(), 0, 1)); // add revision for the local cluster node
+
+ primary.set(lastSeenRev);
+ secondary.set(lastSeenRev);
+ replica.updateRevisions();
+
+ // change has been replicated by now, it's fine to use secondary
+ assertEquals(testPref,
+ mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ }
+
+ @Test
+ public void testMongoReadPreferencesForLocalChanges() throws Exception {
//Change the default
ReadPreference testPref = ReadPreference.secondary();
mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref);
@@ -118,21 +179,21 @@ public class ReadPreferenceIT extends Ab
b1.child("x").child("y");
nodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- String id = Utils.getIdFromPath("/x/y");
- String parentId = Utils.getParentId(id);
- mongoDS.invalidateCache(NODES,id);
+ mongoDS.invalidateCache();
- //For modifiedTime < replicationLag primary must be used
+ // the local change hasn't been replicated yet, primary must be used
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
- //Going into future to make parent /x old enough
- clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag);
- mongoDS.setClock(clock);
+ // make the secondary up-to-date
+ long now = Revision.getCurrentTimestamp();
+ primary.addRevision(now, 0, 1, false);
+ secondary.addRevision(now, 0, 1, false);
+ replica.updateRevisions();
- //For old modified nodes secondaries should be preferred
+ // local change has been replicated by now, it's fine to use secondary
assertEquals(testPref,
- mongoDS.getMongoReadPreference(NODES, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(NODES, null, "/x/y", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
}
@Test
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java?rev=1751441&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java Tue Jul 5 11:11:13 2016
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.junit.Test;
+
+public class LocalChangesTest {
+
+ @Test
+ public void testReplicaInactive() {
+ LocalChanges l = new LocalChanges(0);
+ assertTrue(l.mayContain("2:/xyz/123"));
+ assertTrue(l.mayContainChildrenOf("2:/xyz/123"));
+
+ l.add("2:/xyz/123", 2); // don't remember the path, only the 2 timestamp
+ l.gotRootRevisions(revsV(1, 1, 1));
+ assertTrue(l.mayContain("2:/xyz/123"));
+ assertTrue(l.mayContain("2:/abc/567")); // we only remembered the timestamp, not the path
+
+ l.gotRootRevisions(revsV(3, 3, 3)); // the new revision >= the remembered 2
+ assertFalse(l.mayContain("2:/xyz/123"));
+ assertFalse(l.mayContain("2:/abc/567"));
+ }
+
+ @Test
+ public void testMayContain() {
+ LocalChanges l = new LocalChanges(0);
+ l.add("2:/xyz/123", 2);
+
+ l.gotRootRevisions(revsV(1, 1, 1));
+ assertTrue(l.mayContain("2:/xyz/123"));
+ assertTrue(l.mayContainChildrenOf("1:/xyz"));
+
+ l.gotRootRevisions(revsV(2, 2, 2));
+ assertFalse(l.mayContain("2:/xyz/123"));
+ assertFalse(l.mayContainChildrenOf("1:/xyz"));
+ }
+
+ @Test
+ public void testGotRootRevisions() {
+ LocalChanges l = new LocalChanges(2); // only consider the last timestamp in revisison
+ l.add("2:/xyz/123", 4);
+
+ l.gotRootRevisions(revsV(1, 1, 1));
+ assertTrue(l.mayContain("2:/xyz/123"));
+
+ l.gotRootRevisions(revsV(2, 2, 2));
+ assertTrue(l.mayContain("2:/xyz/123"));
+
+ l.gotRootRevisions(revsV(2, 3, 3));
+ assertTrue(l.mayContain("2:/xyz/123"));
+
+ l.gotRootRevisions(revsV(2, 3, 4));
+ assertFalse(l.mayContain("2:/xyz/123"));
+ }
+
+ @Test
+ public void testLimit() {
+ LocalChanges l = new LocalChanges(0);
+ l.gotRootRevisions(revsV(1)); // make the class active
+
+ for (int i = 1; i <= 99; i++) {
+ l.add("2:/xyz/" + i, i + 100);
+ assertTrue(l.mayContain("2:/xyz/" + i));
+ assertFalse(l.mayContain("2:/abc/" + i));
+ }
+ l.add("2:/xyz/100", 200); // the list should be cleared right now
+ l.add("2:/abc/123", 300); // this is added to the new list
+ l.add("2:/abc/456", 100); // this shouldn't be added to the new list (as it's old)
+
+ // now the list should be cleared and we should got true for all documents
+ assertTrue(l.mayContain("2:/abc/999"));
+ assertEquals(singleton("2:/abc/123"), l.localChanges.keySet());
+
+ l.gotRootRevisions(revsV(200)); // invalidate
+ assertFalse(l.mayContain("2:/xyz/0"));
+ assertFalse(l.mayContain("2:/xyz/99"));
+
+ assertTrue(l.mayContain("2:/abc/123"));
+ assertFalse(l.mayContain("2:/abc/456"));
+ }
+
+ @Test
+ public void dontAddOldRevisions() {
+ LocalChanges l = new LocalChanges(0);
+ l.gotRootRevisions(revsV(10));
+ l.add("2:/xyz/1", 5);
+ assertFalse(l.mayContain("2:/xyz/1"));
+ }
+
+ private Collection<Revision> revs(int... timestamps) {
+ List<Revision> revs = new ArrayList<Revision>();
+ for (int i = 0; i < timestamps.length; i++) {
+ revs.add(new Revision(timestamps[i], 0, i, false));
+ }
+ return revs;
+ }
+
+ private RevisionVector revsV(int... timestamps) {
+ return new RevisionVector(revs(timestamps));
+ }
+}