You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2016/06/17 08:53:11 UTC
svn commit: r1748812 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
main/jav...
Author: tomekr
Date: Fri Jun 17 08:53:10 2016
New Revision: 1748812
URL: http://svn.apache.org/viewvc?rev=1748812&view=rev
Log:
OAK-3865: New strategy to optimize secondary reads
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1748812&r1=1748811&r2=1748812&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Fri Jun 17 08:53:10 2016
@@ -499,6 +499,7 @@ public class DocumentMK {
public static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16;
private DocumentNodeStore nodeStore;
private DocumentStore documentStore;
+ private String mongoUri;
private DiffCache diffCache;
private BlobStore blobStore;
private int clusterId = Integer.getInteger("oak.documentMK.clusterId", 0);
@@ -554,6 +555,8 @@ public class DocumentMK {
@Nonnull String name,
int blobCacheSizeMB)
throws UnknownHostException {
+ this.mongoUri = uri;
+
DB db = new MongoConnection(uri).getDB(name);
if (!MongoConnection.hasWriteConcern(uri)) {
db.setWriteConcern(MongoConnection.getDefaultWriteConcern(db));
@@ -601,6 +604,16 @@ public class DocumentMK {
}
/**
+ * Returns the Mongo URI used in the {@link #setMongoDB(String, String, int)} method.
+ *
+ * @return the Mongo URI or null if the {@link #setMongoDB(String, String, int)} method hasn't
+ * been called.
+ */
+ public String getMongoUri() {
+ return mongoUri;
+ }
+
+ /**
* Sets a {@link DataSource} to use for the RDB document and blob
* stores.
*
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1748812&r1=1748811&r2=1748812&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Fri Jun 17 08:53:10 2016
@@ -1326,10 +1326,17 @@ public final class NodeDocument extends
}
NodeDocument getPreviousDocument(String prevId){
- //Use the maxAge variant such that in case of Mongo call for
- //previous doc are directed towards replicas first
LOG.trace("get previous document {}", prevId);
- return store.find(Collection.NODES, prevId, Integer.MAX_VALUE);
+ NodeDocument doc = store.find(Collection.NODES, prevId);
+ if (doc == null) {
+ // In case secondary read preference is used and node is not found
+ // then check with primary again as it might happen that node document has not been
+ // replicated. We know that document with such an id must exist but possibly dut to
+ // replication lag it has not reached to secondary. So in that case read again
+ // from primary
+ doc = store.find(Collection.NODES, prevId, 0);
+ }
+ return doc;
}
@Nonnull
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java?rev=1748812&r1=1748811&r2=1748812&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java Fri Jun 17 08:53:10 2016
@@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1748812&r1=1748811&r2=1748812&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Fri Jun 17 08:53:10 2016
@@ -61,6 +61,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.JournalEntry;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
@@ -70,6 +71,8 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.LocalChanges;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo;
import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks;
import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -140,6 +143,12 @@ public class MongoDocumentStore implemen
private Clock clock = Clock.SIMPLE;
+ private final ReplicaSetInfo replicaInfo;
+
+ private RevisionVector mostRecentAccessedRevisions;
+
+ private LocalChanges localChanges;
+
private final long maxReplicationLagMillis;
/**
@@ -171,6 +180,24 @@ public class MongoDocumentStore implemen
Long.getLong("oak.mongo.maxQueryTimeMS", TimeUnit.MINUTES.toMillis(1));
/**
+ * How often in milliseconds the MongoDocumentStore should estimate the
+ * replication lag.
+ * <p>
+ * Default is 60'000 (one minute).
+ */
+ private long estimationPullFrequencyMS =
+ Long.getLong("oak.mongo.estimationPullFrequencyMS", TimeUnit.SECONDS.toMillis(5));
+
+ /**
+ * Fallback to the old secondary-routing strategy. Setting this to true
+ * disables the optimisation introduced in the OAK-3865.
+ * <p>
+ * Default is false.
+ */
+ private boolean fallbackSecondaryStrategy =
+ Boolean.getBoolean("oak.mongo.fallbackSecondaryStrategy");
+
+ /**
* The number of documents to put into one bulk update.
* <p>
* Default is 30.
@@ -211,6 +238,18 @@ public class MongoDocumentStore implemen
maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
+ if (fallbackSecondaryStrategy) {
+ replicaInfo = null;
+ localChanges = null;
+ } else {
+ replicaInfo = new ReplicaSetInfo(clock, db, builder.getMongoUri(), estimationPullFrequencyMS, maxReplicationLagMillis, builder.getExecutor());
+ Thread replicaInfoThread = new Thread(replicaInfo, "MongoDocumentStore replica set info provider (" + builder.getClusterId() + ")");
+ replicaInfoThread.setDaemon(true);
+ replicaInfoThread.start();
+ localChanges = new LocalChanges();
+ replicaInfo.addListener(localChanges);
+ }
+
// indexes:
// the _id field is the primary key, so we don't need to define it
@@ -471,7 +510,7 @@ public class MongoDocumentStore implemen
boolean isSlaveOk = false;
boolean docFound = true;
try {
- ReadPreference readPreference = getMongoReadPreference(collection, Utils.getParentId(key), docReadPref);
+ ReadPreference readPreference = getMongoReadPreference(collection, null, key, docReadPref);
if(readPreference.isSlaveOk()){
LOG.trace("Routing call to secondary for fetching [{}]", key);
@@ -480,17 +519,6 @@ public class MongoDocumentStore implemen
DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, readPreference);
- if (obj == null
- && readPreference.isSlaveOk()) {
- //In case secondary read preference is used and node is not found
- //then check with primary again as it might happen that node document has not been
- //replicated. This is required for case like SplitDocument where the SplitDoc is fetched with
- //maxCacheAge == Integer.MAX_VALUE which results in readPreference of secondary.
- //In such a case we know that document with such an id must exist
- //but possibly dut to replication lag it has not reached to secondary. So in that case read again
- //from primary
- obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, ReadPreference.primary());
- }
if(obj == null){
docFound = false;
return null;
@@ -499,6 +527,7 @@ public class MongoDocumentStore implemen
if (doc != null) {
doc.seal();
}
+ updateLatestAccessedRevs(doc);
return doc;
} finally {
stats.doneFindUncached(watch.elapsed(TimeUnit.NANOSECONDS), collection, key, docFound, isSlaveOk);
@@ -583,7 +612,7 @@ public class MongoDocumentStore implemen
cursor.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
}
ReadPreference readPreference =
- getMongoReadPreference(collection, parentId, getDefaultReadPreference(collection));
+ getMongoReadPreference(collection, parentId, null, getDefaultReadPreference(collection));
if(readPreference.isSlaveOk()){
isSlaveOk = true;
@@ -598,6 +627,7 @@ public class MongoDocumentStore implemen
for (int i = 0; i < limit && cursor.hasNext(); i++) {
DBObject o = cursor.next();
T doc = convertFromDBObject(collection, o);
+ updateLatestAccessedRevs(doc);
list.add(doc);
}
resultSize = list.size();
@@ -770,6 +800,7 @@ public class MongoDocumentStore implemen
if (collection == Collection.NODES) {
NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, updateOp);
nodesCache.put(newDoc);
+ updateLocalChanges(newDoc);
}
oldDoc.seal();
} else if (upsert) {
@@ -777,6 +808,7 @@ public class MongoDocumentStore implemen
NodeDocument doc = (NodeDocument) collection.newDocument(this);
UpdateUtils.applyChanges(doc, updateOp);
nodesCache.putIfAbsent(doc);
+ updateLocalChanges(doc);
}
} else {
// updateOp without conditions and not an upsert
@@ -939,6 +971,11 @@ public class MongoDocumentStore implemen
docsToCache.add(newDoc);
}
}
+
+ for (NodeDocument doc : docsToCache) {
+ updateLocalChanges(doc);
+ }
+
nodesCache.putNonConflictingDocs(tracker, docsToCache);
}
oldDocs.keySet().removeAll(bulkResult.failedUpdates);
@@ -1104,6 +1141,7 @@ public class MongoDocumentStore implemen
if (collection == Collection.NODES) {
for (T doc : docs) {
nodesCache.putIfAbsent((NodeDocument) doc);
+ updateLocalChanges((NodeDocument) doc);
}
}
insertSuccess = true;
@@ -1208,7 +1246,8 @@ public class MongoDocumentStore implemen
}
DocumentReadPreference getReadPreference(int maxCacheAge){
- if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) {
+ long lag = fallbackSecondaryStrategy ? maxReplicationLagMillis : replicaInfo.getLag();
+ if(maxCacheAge >= 0 && maxCacheAge < lag) {
return DocumentReadPreference.PRIMARY;
} else if(maxCacheAge == Integer.MAX_VALUE){
return DocumentReadPreference.PREFER_SECONDARY;
@@ -1221,9 +1260,10 @@ public class MongoDocumentStore implemen
return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : DocumentReadPreference.PRIMARY;
}
- <T extends Document> ReadPreference getMongoReadPreference(Collection<T> collection,
- String parentId,
- DocumentReadPreference preference) {
+ <T extends Document> ReadPreference getMongoReadPreference(@Nonnull Collection<T> collection,
+ @Nullable String parentId,
+ @Nullable String documentId,
+ @Nonnull DocumentReadPreference preference) {
switch(preference){
case PRIMARY:
return ReadPreference.primary();
@@ -1236,23 +1276,37 @@ public class MongoDocumentStore implemen
return ReadPreference.primary();
}
- // read from primary unless parent has not been modified
- // within replication lag period
- ReadPreference readPreference = ReadPreference.primary();
- if (parentId != null) {
- long replicationSafeLimit = getTime() - maxReplicationLagMillis;
- NodeDocument cachedDoc = nodesCache.getIfPresent(parentId);
- // FIXME: this is not quite accurate, because ancestors
+ boolean secondarySafe;
+ if (fallbackSecondaryStrategy) {
+ // This is not quite accurate, because ancestors
// are updated in a background thread (_lastRev). We
// will need to revise this for low maxReplicationLagMillis
// values
- if (cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit)) {
+ long replicationSafeLimit = getTime() - maxReplicationLagMillis;
+ if (parentId == null) {
+ secondarySafe = false;
+ } else {
//If parent has been modified loooong time back then there children
//would also have not be modified. In that case we can read from secondary
- readPreference = getConfiguredReadPreference(collection);
+ NodeDocument cachedDoc = nodesCache.getIfPresent(parentId);
+ secondarySafe = cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit);
}
+ } else {
+ secondarySafe = true;
+ secondarySafe &= collection == Collection.NODES;
+ secondarySafe &= documentId == null || !localChanges.mayContain(documentId);
+ secondarySafe &= parentId == null || !localChanges.mayContainChildrenOf(parentId);
+ secondarySafe &= mostRecentAccessedRevisions == null || replicaInfo.isMoreRecentThan(mostRecentAccessedRevisions);
}
+
+ ReadPreference readPreference;
+ if (secondarySafe) {
+ readPreference = getConfiguredReadPreference(collection);
+ } else {
+ readPreference = ReadPreference.primary();
+ }
+
return readPreference;
default:
throw new IllegalArgumentException("Unsupported usage " + preference);
@@ -1327,6 +1381,9 @@ public class MongoDocumentStore implemen
@Override
public void dispose() {
+ if (replicaInfo != null) {
+ replicaInfo.stop();
+ }
nodes.getDB().getMongo().close();
try {
nodesCache.close();
@@ -1560,6 +1617,27 @@ public class MongoDocumentStore implemen
return diff;
}
+ private synchronized <T extends Document> void updateLatestAccessedRevs(T doc) {
+ if (doc instanceof NodeDocument) {
+ RevisionVector accessedRevs = new RevisionVector(((NodeDocument) doc).getLastRev().values());
+ RevisionVector previousValue = mostRecentAccessedRevisions;
+ if (mostRecentAccessedRevisions == null) {
+ mostRecentAccessedRevisions = accessedRevs;
+ } else {
+ mostRecentAccessedRevisions = mostRecentAccessedRevisions.pmax(accessedRevs);
+ }
+ if (LOG.isDebugEnabled() && !mostRecentAccessedRevisions.equals(previousValue)) {
+ LOG.debug("Most recent accessed revisions: {}", mostRecentAccessedRevisions);
+ }
+ }
+ }
+
+ private void updateLocalChanges(NodeDocument doc) {
+ if (localChanges != null) {
+ localChanges.add(doc.getId(), doc.getLastRev().values());
+ }
+ }
+
private static class BulkUpdateResult {
private final Set<String> failedUpdates;
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+public class GetRootRevisionsCallable implements Callable<Timestamped<RevisionVector>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GetRootRevisionsCallable.class);
+
+ private final String hostName;
+
+ private final NodeCollectionProvider nodeCollections;
+
+ private final Clock clock;
+
+ public GetRootRevisionsCallable(Clock clock, String hostName, NodeCollectionProvider nodeCollections) {
+ this.hostName = hostName;
+ this.nodeCollections = nodeCollections;
+ this.clock = clock;
+ }
+
+ @Override
+ public Timestamped<RevisionVector> call() throws Exception {
+ List<Revision> revisions = new ArrayList<Revision>();
+ DBCollection collection = nodeCollections.get(hostName);
+
+ long start = clock.getTime();
+ DBObject root = collection.findOne(new BasicDBObject(Document.ID, "0:/"));
+ long end = clock.getTime();
+ long mid = (start + end) / 2;
+
+ if (root == null) {
+ LOG.warn("Can't get the root document on {}", hostName);
+ return null;
+ }
+
+ DBObject lastRev = (DBObject) root.get("_lastRev");
+ for (String clusterId : lastRev.keySet()) {
+ String rev = (String) lastRev.get(clusterId);
+ revisions.add(Revision.fromString(rev));
+ }
+ LOG.debug("Got /_lastRev from {}: {}", hostName, lastRev);
+ return new Timestamped<RevisionVector>(new RevisionVector(revisions), mid);
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isGreaterOrEquals;
+
+/**
+ * This class maintains a list of local changes (paths+revisions), which
+ * shouldn't be read from the secondary Mongo, as we are not sure if they have
+ * been already replicated from primary. Once we get this confidence, the entry
+ * will be removed from the map.
+ */
+public class LocalChanges implements ReplicaSetInfoListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalChanges.class);
+
+ /**
+ * How many paths should be stored in the {@link #localChanges} map. If
+ * there's more paths added (and not removed in the
+ * {@link #gotRootRevisions(RevisionVector)}), only the latest changed
+ * revision will be remembered.
+ */
+ private static final int SIZE_LIMIT = 100;
+
+ /**
+ * This map contains document paths and revisions in which they have been
+ * changed. Paths in this collection hasn't been replicated to secondary
+ * instances yet.
+ */
+ final Map<String, RevisionVector> localChanges = new HashMap<String, RevisionVector>();
+
+ /**
+ * If there's more than {@link #SIZE_LIMIT} paths in the
+ * {@link #localChanges}, the class will clear the above map and update this
+ * variable with the latest changed revision. {@code true} will be returned
+ * for all {@link #mayContainChildrenOf(String)} and {@link #mayContain(String)}
+ * invocations until this revision is replicated to all secondary instances.
+ * <p>
+ * This is a safety mechanism, so the {@link #localChanges} won't grow too much.
+ */
+ private volatile RevisionVector latestChange;
+
+ /**
+ * True if the current Mongo installation is an working replica. Otherwise
+ * there's no need to store the local changes.
+ */
+ private volatile boolean replicaActive;
+
+ private volatile RevisionVector rootRevision;
+
+ public void add(String id, Collection<Revision> revs) {
+ RevisionVector revsV = new RevisionVector(revs);
+ RevisionVector localRootRev = rootRevision;
+ if (localRootRev != null && isGreaterOrEquals(localRootRev, revsV)) {
+ return;
+ }
+
+ synchronized (this) {
+ if (latestChange != null && isGreaterOrEquals(latestChange, revsV)) {
+ return;
+ }
+
+ if (replicaActive) {
+ localChanges.put(id, revsV);
+ if (localChanges.size() >= SIZE_LIMIT) {
+ localChanges.clear();
+ latestChange = revsV;
+ LOG.debug(
+ "The local changes count == {}. Clearing the list and switching to the 'latest change' mode: {}",
+ SIZE_LIMIT, latestChange);
+ }
+ } else {
+ latestChange = revsV;
+ }
+ }
+ }
+
+ /**
+ * Check if it's possible that the given document hasn't been replicated to
+ * the secondary yet.
+ *
+ * @param documentId
+ * @return {@code true} if it's possible that the document is still in the
+ * Mongo replication queue
+ */
+ public boolean mayContain(String documentId) {
+ if (!replicaActive || latestChange != null) {
+ return true;
+ }
+
+ synchronized (this) {
+ return localChanges.containsKey(documentId);
+ }
+ }
+
+ /**
+ * Check if it's possible that the children of the given document hasn't
+ * been replicated to the secondary yet.
+ *
+ * @param documentId
+ * @return {@code true} if it's possible that the children of given document
+ * are still in the Mongo replication queue
+ */
+ public boolean mayContainChildrenOf(String parentId) {
+ if (!replicaActive || latestChange != null) {
+ return true;
+ }
+
+ synchronized (this) {
+ for (String key : localChanges.keySet()) {
+ if (parentId.equals(Utils.getParentId(key))) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public void gotRootRevisions(RevisionVector rootRevision) {
+ if (rootRevision == null) {
+ return;
+ }
+
+ this.rootRevision = rootRevision;
+
+ if (!replicaActive) {
+ replicaActive = true;
+ LOG.info("Replica set became active");
+ }
+
+ synchronized (this) {
+ if (latestChange != null && latestChange.compareTo(rootRevision) <= 0) {
+ latestChange = null;
+ }
+
+ Iterator<RevisionVector> it = localChanges.values().iterator();
+ while (it.hasNext()) {
+ if (it.next().compareTo(rootRevision) <= 0) {
+ it.remove();
+ }
+ }
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static com.google.common.collect.Sets.difference;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientException;
+import com.mongodb.MongoClientURI;
+
+/**
+ * This class connects to Mongo instances and returns the NODES collection.
+ * Connections are cached.
+ */
+public class NodeCollectionProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodeCollectionProvider.class);
+
+ private final Map<String, DBCollection> collections = new ConcurrentHashMap<String, DBCollection>();
+
+ private final String originalMongoUri;
+
+ private final String dbName;
+
+ public NodeCollectionProvider(String originalMongoUri, String dbName) {
+ this.originalMongoUri = originalMongoUri;
+ this.dbName = dbName;
+ }
+
+ public void retain(Set<String> hostNames) {
+ close(difference(collections.keySet(), hostNames));
+ }
+
+ public void close() {
+ close(collections.keySet());
+ }
+
+ private void close(Set<String> hostNames) {
+ Iterator<Entry<String, DBCollection>> it = collections.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, DBCollection> entry = it.next();
+ if (hostNames.contains(entry.getKey())) {
+ try {
+ entry.getValue().getDB().getMongo().close();
+ it.remove();
+ } catch (MongoClientException e) {
+ LOG.error("Can't close Mongo client", e);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ public DBCollection get(String hostname) throws UnknownHostException {
+ if (collections.containsKey(hostname)) {
+ return collections.get(hostname);
+ }
+
+ MongoClient client;
+ if (originalMongoUri == null) {
+ MongoClientURI uri = new MongoClientURI("mongodb://" + hostname);
+ client = new MongoClient(uri);
+ } else {
+ client = prepareClientForHostname(hostname);
+ }
+
+ DB db = client.getDB(dbName);
+ db.getMongo().slaveOk();
+ DBCollection collection = db.getCollection(Collection.NODES.toString());
+ collections.put(hostname, collection);
+ return collection;
+ }
+
+ private MongoClient prepareClientForHostname(String hostname) throws UnknownHostException {
+ ServerAddress address;
+ if (hostname.contains(":")) {
+ String[] hostSplit = hostname.split(":");
+ if (hostSplit.length != 2) {
+ throw new IllegalArgumentException("Not a valid hostname: " + hostname);
+ }
+ address = new ServerAddress(hostSplit[0], Integer.parseInt(hostSplit[1]));
+ } else {
+ address = new ServerAddress(hostname);
+ }
+
+ MongoClientURI originalUri = new MongoClientURI(originalMongoUri);
+ List<MongoCredential> credentialList = new ArrayList<MongoCredential>(1);
+ if (originalUri.getCredentials() != null) {
+ credentialList.add(originalUri.getCredentials());
+ }
+ return new MongoClient(address, credentialList, originalUri.getOptions());
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static com.google.common.base.Predicates.in;
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Iterables.isEmpty;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.filterKeys;
+import static com.google.common.collect.Sets.union;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isGreaterOrEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
+
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.bson.BasicBSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.MongoException;
+import com.mongodb.ReadPreference;
+
+/**
+ * This class analyses the replica set info provided by MongoDB to find out two
+ * what's the current synchronization state of secondary instances in terms of
+ * revision values and timestamp.
+ */
+public class ReplicaSetInfo implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetInfo.class);
+
+ private final DB adminDb;
+
+ private final long pullFrequencyMillis;
+
+ private final long maxReplicationLagMillis;
+
+ private final Executor executor;
+
+ private final NodeCollectionProvider nodeCollections;
+
+ private final Clock clock;
+
+ private final Object stopMonitor = new Object();
+
+ private final List<ReplicaSetInfoListener> listeners = new CopyOnWriteArrayList<ReplicaSetInfoListener>();
+
+ private volatile RevisionVector rootRevisions;
+
+ volatile long secondariesSafeTimestamp;
+
+ List<String> hiddenMembers;
+
+ private volatile boolean stop;
+
+ public ReplicaSetInfo(Clock clock, DB db, String originalMongoUri, long pullFrequencyMillis, long maxReplicationLagMillis, Executor executor) {
+ this.executor = executor;
+ this.clock = clock;
+ this.adminDb = db.getSisterDB("admin");
+ this.pullFrequencyMillis = pullFrequencyMillis;
+ this.maxReplicationLagMillis = maxReplicationLagMillis;
+ this.nodeCollections = new NodeCollectionProvider(originalMongoUri, db.getName());
+ }
+
+ public void addListener(ReplicaSetInfoListener listener) {
+ listeners.add(listener);
+ }
+
+ public boolean isMoreRecentThan(RevisionVector revisions) {
+ RevisionVector localRootRevisions = rootRevisions;
+ if (localRootRevisions == null) {
+ return false;
+ } else {
+ return isGreaterOrEquals(localRootRevisions, revisions);
+ }
+ }
+
+ public long getLag() {
+ long localTS = secondariesSafeTimestamp;
+ if (localTS == 0) {
+ return maxReplicationLagMillis;
+ } else {
+ return clock.getTime() - localTS;
+ }
+ }
+
+ @Nullable
+ public RevisionVector getMinimumRootRevisions() {
+ return rootRevisions;
+ }
+
+ public void stop() {
+ synchronized (stopMonitor) {
+ stop = true;
+ stopMonitor.notify();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ updateLoop();
+ } catch (Exception e) {
+ LOG.error("Exception in the ReplicaSetInfo thread", e);
+ }
+ }
+
+ private void updateLoop() {
+ while (!stop) {
+ if (hiddenMembers == null) {
+ hiddenMembers = getHiddenMembers();
+ } else {
+ updateReplicaStatus();
+
+ for (ReplicaSetInfoListener listener : listeners) {
+ listener.gotRootRevisions(rootRevisions);
+ }
+ }
+
+ synchronized (stopMonitor) {
+ try {
+ if (!stop) {
+ stopMonitor.wait(pullFrequencyMillis);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ LOG.debug("Stopping the replica set info");
+ nodeCollections.close();
+ }
+
+ void updateReplicaStatus() {
+ BasicDBObject result;
+ try {
+ result = getReplicaStatus();
+ } catch (MongoException e) {
+ LOG.error("Can't get replica status", e);
+ rootRevisions = null;
+ secondariesSafeTimestamp = 0;
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ Iterable<BasicBSONObject> members = (Iterable<BasicBSONObject>) result.get("members");
+ if (members == null) {
+ members = Collections.emptyList();
+ }
+ updateRevisions(members);
+ }
+
+ List<String> getHiddenMembers() {
+ BasicDBObject result;
+ try {
+ result = getReplicaConfig();
+ } catch (MongoException e) {
+ LOG.error("Can't get replica configuration", e);
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ Iterable<BasicBSONObject> members = (Iterable<BasicBSONObject>) result.get("members");
+ if (members == null) {
+ members = Collections.emptyList();
+ }
+
+ List<String> hiddenMembers = new ArrayList<String>();
+ for (BasicBSONObject member : members) {
+ if (member.getBoolean("hidden")) {
+ hiddenMembers.add(member.getString("host"));
+ }
+ }
+ return hiddenMembers;
+ }
+
+ protected BasicDBObject getReplicaConfig() {
+ return adminDb.command("replSetGetConfig", ReadPreference.primary());
+ }
+
+ protected BasicDBObject getReplicaStatus() {
+ return adminDb.command("replSetGetStatus", ReadPreference.primary());
+ }
+
+ private void updateRevisions(Iterable<BasicBSONObject> members) {
+ Set<String> secondaries = new HashSet<String>();
+ boolean unknownState = false;
+ String primary = null;
+
+ for (BasicBSONObject member : members) {
+ MemberState state;
+ try {
+ state = MemberState.valueOf(member.getString("stateStr"));
+ } catch (IllegalArgumentException e) {
+ state = MemberState.UNKNOWN;
+ }
+ String name = member.getString("name");
+ if (hiddenMembers.contains(name)) {
+ continue;
+ }
+
+ switch (state) {
+ case PRIMARY:
+ primary = name;
+ continue;
+
+ case SECONDARY:
+ secondaries.add(name);
+ break;
+
+ case ARBITER:
+ continue;
+
+ default:
+ LOG.debug("Invalid state {} for instance {}", state, name);
+ unknownState = true;
+ break;
+ }
+ }
+
+ if (secondaries.isEmpty()) {
+ LOG.debug("No secondaries found: {}", members);
+ unknownState = true;
+ }
+
+ if (primary == null) {
+ LOG.debug("No primary found: {}", members);
+ unknownState = true;
+ }
+
+ Map<String, Timestamped<RevisionVector>> vectors = null;
+ if (!unknownState) {
+ vectors = getRootRevisions(union(secondaries, of(primary)));
+ if (vectors.containsValue(null)) {
+ unknownState = true;
+ }
+ }
+
+ if (unknownState) {
+ rootRevisions = null;
+ secondariesSafeTimestamp = 0;
+ } else {
+ Timestamped<RevisionVector> primaryRevision = vectors.get(primary);
+ Iterable<Timestamped<RevisionVector>> secondaryRevisions = filterKeys(vectors, in(secondaries)).values();
+
+ rootRevisions = pmin(transform(secondaryRevisions, Timestamped.<RevisionVector>getExtractFunction()));
+ if (rootRevisions == null || primaryRevision == null || isEmpty(secondaryRevisions)) {
+ secondariesSafeTimestamp = 0;
+ } else {
+ secondariesSafeTimestamp = getSecondariesSafeTimestamp(primaryRevision, secondaryRevisions);
+ }
+ }
+
+ LOG.debug("Minimum root revisions: {}. Current lag: {}", rootRevisions, getLag());
+ nodeCollections.retain(secondaries);
+ }
+
+ /**
+ * Find the oldest revision which hasn't been replicated from primary to
+ * secondary yet and return its timestamp. If all revisions has been already
+ * replicated, return the date of the measurement.
+ *
+ * @return the point in time to which the secondary instances has been synchronized
+ */
+ private long getSecondariesSafeTimestamp(Timestamped<RevisionVector> primary, Iterable<Timestamped<RevisionVector>> secondaries) {
+ final RevisionVector priRev = primary.getValue();
+ Long oldestNotReplicated = null;
+ for (Timestamped<RevisionVector> v : secondaries) {
+ RevisionVector secRev = v.getValue();
+ if (secRev.equals(priRev)) {
+ continue;
+ }
+
+ for (Revision pr : priRev) {
+ Revision sr = secRev.getRevision(pr.getClusterId());
+ if (pr.equals(sr)) {
+ continue;
+ }
+ if (oldestNotReplicated == null || oldestNotReplicated > pr.getTimestamp()) {
+ oldestNotReplicated = pr.getTimestamp();
+ }
+ }
+ }
+
+ if (oldestNotReplicated == null) {
+ long minOpTimestamp = primary.getOperationTimestamp();
+ for (Timestamped<RevisionVector> v : secondaries) {
+ if (v.getOperationTimestamp() < minOpTimestamp) {
+ minOpTimestamp = v.getOperationTimestamp();
+ }
+ }
+ return minOpTimestamp;
+ } else {
+ return oldestNotReplicated;
+ }
+ }
+
+ protected Map<String, Timestamped<RevisionVector>> getRootRevisions(Iterable<String> hosts) {
+ Map<String, Future<Timestamped<RevisionVector>>> futures = new HashMap<String, Future<Timestamped<RevisionVector>>>();
+ for (final String hostName : hosts) {
+ Callable<Timestamped<RevisionVector>> callable = new GetRootRevisionsCallable(clock, hostName, nodeCollections);
+ FutureTask<Timestamped<RevisionVector>> futureTask = new FutureTask<Timestamped<RevisionVector>>(callable);
+ futures.put(hostName, futureTask);
+ executor.execute(futureTask);
+ }
+
+ Map<String, Timestamped<RevisionVector>> result = new HashMap<String, Timestamped<RevisionVector>>();
+ for (Entry<String, Future<Timestamped<RevisionVector>>> entry : futures.entrySet()) {
+ try {
+ result.put(entry.getKey(), entry.getValue().get());
+ } catch (Exception e) {
+ LOG.error("Can't connect to the Mongo instance", e);
+ }
+ }
+ return result;
+ }
+
+ private static RevisionVector pmin(Iterable<RevisionVector> vectors) {
+ RevisionVector minimum = null;
+ for (RevisionVector v : vectors) {
+ if (v == null) {
+ return null;
+ } else if (minimum == null) {
+ minimum = v;
+ } else {
+ minimum = minimum.pmin(v);
+ }
+ }
+ return minimum;
+ }
+
+ enum MemberState {
+ STARTUP, PRIMARY, SECONDARY, RECOVERING, STARTUP2, UNKNOWN, ARBITER, DOWN, ROLLBACK, REMOVED
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+
+/**
+ * Classes implementing this interface will be informed about the current root
+ * revision states on secondary instances by {@link ReplicaSetInfo}.
+ */
+public interface ReplicaSetInfoListener {
+
+ void gotRootRevisions(RevisionVector rootRevision);
+
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import com.google.common.base.Function;
+
+/**
+ * A value with a timestamp.
+ *
+ * @param <T> the value type
+ */
+public class Timestamped<T> {
+
+ private final T value;
+
+ private final long operationTimestamp;
+
+ public Timestamped(T value, long operationTimestamp) {
+ this.value = value;
+ this.operationTimestamp = operationTimestamp;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public long getOperationTimestamp() {
+ return operationTimestamp;
+ }
+
+ public static <T> Function<Timestamped<T>, T> getExtractFunction() {
+ return new Function<Timestamped<T>, T>() {
+ @Override
+ public T apply(Timestamped<T> input) {
+ if (input == null) {
+ return null;
+ } else {
+ return input.value;
+ }
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder("Timestamped[").append(value).append('(').append(operationTimestamp).append(")]")
+ .toString();
+ }
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1748812&r1=1748811&r2=1748812&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Fri Jun 17 08:53:10 2016
@@ -759,6 +759,21 @@ public class Utils {
}
/**
+ * Returns true if all the revisions in the {@code a} greater or equals
+ * to their counterparts in {@code b}. If {@code b} contains revisions
+ * for cluster nodes that are not present in {@code a}, return false.
+ *
+ * @param a
+ * @param b
+ * @return true if all the revisions in the {@code a} are at least
+ * as recent as their counterparts in the {@code b}
+ */
+ public static boolean isGreaterOrEquals(@Nonnull RevisionVector a,
+ @Nonnull RevisionVector b) {
+ return a.pmax(b).equals(a);
+ }
+
+ /**
* Wraps the given iterable and aborts iteration over elements when the
* predicate on an element evaluates to {@code false}.
*
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java?rev=1748812&r1=1748811&r2=1748812&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java Fri Jun 17 08:53:10 2016
@@ -54,7 +54,6 @@ public class ReadPreferenceIT extends Ab
replicationLag = TimeUnit.SECONDS.toMillis(10);
mongoConnection = connectionFactory.getConnection();
mk = new DocumentMK.Builder()
- .setMaxReplicationLag(replicationLag, TimeUnit.MILLISECONDS)
.setMongoDB(mongoConnection.getDB())
.setClusterId(1)
.setLeaseCheck(false)
@@ -83,27 +82,27 @@ public class ReadPreferenceIT extends Ab
@Test
public void testMongoReadPreferencesDefault() throws Exception{
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PRIMARY));
assertEquals(ReadPreference.primaryPreferred(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_PRIMARY));
//By default Mongo read preference is primary
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY));
//Change the default and assert again
mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary());
assertEquals(ReadPreference.secondary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY));
//for case where parent age cannot be determined the preference should be primary
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
//For collection other than NODES always primary
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(SETTINGS,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
}
@@ -124,7 +123,7 @@ public class ReadPreferenceIT extends Ab
//For modifiedTime < replicationLag primary must be used
assertEquals(ReadPreference.primary(),
- mongoDS.getMongoReadPreference(NODES,parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(NODES,parentId, null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
//Going into future to make parent /x old enough
clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag);
@@ -132,7 +131,7 @@ public class ReadPreferenceIT extends Ab
//For old modified nodes secondaries should be preferred
assertEquals(testPref,
- mongoDS.getMongoReadPreference(NODES, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+ mongoDS.getMongoReadPreference(NODES, parentId, null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
}
@Test
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.junit.Test;
+
+public class LocalChangesTest {
+
+ @Test
+ public void testReplicaInactive() {
+ LocalChanges l = new LocalChanges();
+ assertTrue(l.mayContain("2:/xyz/123"));
+ assertTrue(l.mayContainChildrenOf("2:/xyz/123"));
+
+ l.add("2:/xyz/123", revs(2, 2, 2)); // don't remember the path, only the (2,2,2) revision
+ l.gotRootRevisions(revsV(1, 1, 1));
+ assertTrue(l.mayContain("2:/xyz/123"));
+ assertTrue(l.mayContain("2:/abc/567")); // we only remembered the timestamp, not the path
+
+ l.gotRootRevisions(revsV(3, 3, 3)); // the new revision >= the remembered (2,2,2)
+ assertFalse(l.mayContain("2:/xyz/123"));
+ assertFalse(l.mayContain("2:/abc/567"));
+ }
+
+ @Test
+ public void testMayContain() {
+ LocalChanges l = new LocalChanges();
+ l.add("2:/xyz/123", revs(2, 2, 2));
+
+ l.gotRootRevisions(revsV(1, 1, 1));
+ assertTrue(l.mayContain("2:/xyz/123"));
+ assertTrue(l.mayContainChildrenOf("1:/xyz"));
+
+ l.gotRootRevisions(revsV(2, 2, 2));
+ assertFalse(l.mayContain("2:/xyz/123"));
+ assertFalse(l.mayContainChildrenOf("1:/xyz"));
+ }
+
+ @Test
+ public void testGotRootRevisions() {
+ LocalChanges l = new LocalChanges();
+ l.add("2:/xyz/123", revs(2, 3, 4));
+
+ l.gotRootRevisions(revsV(1, 1, 1));
+ assertTrue(l.mayContain("2:/xyz/123"));
+
+ l.gotRootRevisions(revsV(2, 2, 2));
+ assertTrue(l.mayContain("2:/xyz/123"));
+
+ l.gotRootRevisions(revsV(2, 3, 3));
+ assertTrue(l.mayContain("2:/xyz/123"));
+
+ l.gotRootRevisions(revsV(2, 3, 4));
+ assertFalse(l.mayContain("2:/xyz/123"));
+ }
+
+ @Test
+ public void testLimit() {
+ LocalChanges l = new LocalChanges();
+ l.gotRootRevisions(revsV(1)); // make the class active
+
+ for (int i = 1; i <= 99; i++) {
+ l.add("2:/xyz/" + i, revs(i + 100));
+ assertTrue(l.mayContain("2:/xyz/" + i));
+ assertFalse(l.mayContain("2:/abc/" + i));
+ }
+ l.add("2:/xyz/100", revs(200)); // the list should be cleared right now
+ l.add("2:/abc/123", revs(300)); // this is added to the new list
+ l.add("2:/abc/456", revs(100)); // this shouldn't be added to the new list (as it's old)
+
+ // now the list should be cleared and we should got true for all documents
+ assertTrue(l.mayContain("2:/abc/999"));
+ assertEquals(singleton("2:/abc/123"), l.localChanges.keySet());
+
+ l.gotRootRevisions(revsV(200)); // invalidate
+ assertFalse(l.mayContain("2:/xyz/0"));
+ assertFalse(l.mayContain("2:/xyz/99"));
+
+ assertTrue(l.mayContain("2:/abc/123"));
+ assertFalse(l.mayContain("2:/abc/456"));
+ }
+
+ @Test
+ public void dontAddOldRevisions() {
+ LocalChanges l = new LocalChanges();
+ l.gotRootRevisions(revsV(10));
+ l.add("2:/xyz/1", revs(5));
+ assertFalse(l.mayContain("2:/xyz/1"));
+ }
+
+ private Collection<Revision> revs(int... timestamps) {
+ List<Revision> revs = new ArrayList<Revision>();
+ for (int i = 0; i < timestamps.length; i++) {
+ revs.add(new Revision(timestamps[i], 0, i, false));
+ }
+ return revs;
+ }
+
+ private RevisionVector revsV(int... timestamps) {
+ return new RevisionVector(revs(timestamps));
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java?rev=1748812&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java Fri Jun 17 08:53:10 2016
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo.replica;
+
+import static com.google.common.collect.Maps.transformValues;
+import static org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState.PRIMARY;
+import static org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState.RECOVERING;
+import static org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState.SECONDARY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.bson.BasicBSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Function;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+
+public class ReplicaSetInfoTest {
+
+ private ReplicaSetInfo replica;
+
+ private ReplicationSetStatusMock replicationSet;
+
+ private Clock.Virtual clock;
+
+ private Clock.Virtual mongoClock;
+
+ @Before
+ public void resetEstimator() {
+ clock = mongoClock = new Clock.Virtual();
+
+ DB db = mock(DB.class);
+ when(db.getName()).thenReturn("oak-db");
+ when(db.getSisterDB(Mockito.anyString())).thenReturn(db);
+ replica = new ReplicaSetInfo(clock, db, null, 0l, 0l, null) {
+ @Override
+ protected BasicDBObject getReplicaStatus() {
+ BasicDBObject obj = new BasicDBObject();
+ obj.put("date", mongoClock.getDate());
+ obj.put("members", replicationSet.members);
+ return obj;
+ }
+
+ @Override
+ protected Map<String, Timestamped<RevisionVector>> getRootRevisions(Iterable<String> hosts) {
+ return transformValues(replicationSet.memberRevisions,
+ new Function<RevisionBuilder, Timestamped<RevisionVector>>() {
+ @Override
+ public Timestamped<RevisionVector> apply(RevisionBuilder input) {
+ return new Timestamped<RevisionVector>(input.revs, clock.getTime());
+ }
+ });
+ }
+ };
+ replica.hiddenMembers = Collections.emptyList();
+ }
+
+ @Test
+ public void testMinimumRevision() {
+ addInstance(PRIMARY, "mp").addRevisions(20, 18, 19);
+ addInstance(SECONDARY, "m1").addRevisions(20, 18, 3);
+ addInstance(SECONDARY, "m2").addRevisions(20, 1, 17);
+ updateRevisions();
+
+ assertEquals(20, replica.getMinimumRootRevisions().getRevision(0).getTimestamp());
+ assertEquals( 1, replica.getMinimumRootRevisions().getRevision(1).getTimestamp());
+ assertEquals( 3, replica.getMinimumRootRevisions().getRevision(2).getTimestamp());
+ }
+
+ @Test
+ public void testIsMoreRecentThan() {
+ addInstance(PRIMARY, "mp").addRevisions(15, 21, 22);
+ addInstance(SECONDARY, "m1").addRevisions(10, 21, 11);
+ addInstance(SECONDARY, "m2").addRevisions(15, 14, 13);
+ addInstance(SECONDARY, "m3").addRevisions(14, 13, 22);
+ updateRevisions();
+
+ assertTrue(replica.isMoreRecentThan(lastRev(9, 13, 10)));
+ assertFalse(replica.isMoreRecentThan(lastRev(11, 14, 10)));
+ }
+
+ @Test
+ public void testUnknownStateIsNotSafe() {
+ addInstance(PRIMARY, "mp");
+ addInstance(SECONDARY, "m1").addRevisions(10, 21, 11);
+ addInstance(RECOVERING, "m2");
+ updateRevisions();
+
+ assertNull(replica.getMinimumRootRevisions());
+ assertFalse(replica.isMoreRecentThan(lastRev(1, 1, 1)));
+ }
+
+ @Test
+ public void testEmptyIsNotSafe() {
+ addInstance(PRIMARY, "m1");
+ updateRevisions();
+
+ assertNull(replica.getMinimumRootRevisions());
+ assertFalse(replica.isMoreRecentThan(lastRev(1, 1, 1)));
+ }
+
+ @Test
+ public void testOldestNotReplicated() {
+ addInstance(PRIMARY, "mp").addRevisions(10, 30, 30);
+ addInstance(SECONDARY, "m1").addRevisions(10, 5, 30);
+ addInstance(SECONDARY, "m2").addRevisions(2, 30, 30);
+ updateRevisions();
+
+ assertEquals(10, replica.secondariesSafeTimestamp);
+ }
+
+ @Test
+ public void testAllSecondariesUpToDate() {
+ addInstance(PRIMARY, "mp").addRevisions(10, 30, 30);
+ addInstance(SECONDARY, "m1").addRevisions(10, 30, 30);
+ addInstance(SECONDARY, "m2").addRevisions(10, 30, 30);
+
+ long before = clock.getTime();
+ updateRevisions();
+ long after = clock.getTime();
+
+ assertBetween(before, after, replica.secondariesSafeTimestamp);
+ }
+
+ @Test
+ public void testAllSecondariesUpToDateWithTimediff() {
+ addInstance(PRIMARY, "mp").addRevisions(10, 30, 30);
+ addInstance(SECONDARY, "m1").addRevisions(10, 30, 30);
+ addInstance(SECONDARY, "m2").addRevisions(10, 30, 30);
+
+ mongoClock = new Clock.Virtual();
+ mongoClock.waitUntil(100);
+
+ long before = clock.getTime();
+ updateRevisions();
+ long after = clock.getTime();
+
+ assertBetween(before, after, replica.secondariesSafeTimestamp);
+ }
+
+ private RevisionBuilder addInstance(MemberState state, String name) {
+ if (replicationSet == null) {
+ replicationSet = new ReplicationSetStatusMock();
+ }
+ return replicationSet.addInstance(state, name);
+ }
+
+ private void updateRevisions() {
+ replica.updateReplicaStatus();
+ replicationSet = null;
+ }
+
+ private static RevisionVector lastRev(int... timestamps) {
+ return new RevisionBuilder().addRevisions(timestamps).revs;
+ }
+
+ private static void assertBetween(long from, long to, long actual) {
+ final String msg = String.format("%d <= %d <= %d", from, actual, to);
+ assertTrue(msg, from <= actual);
+ assertTrue(msg, actual <= to);
+ }
+
+ private class ReplicationSetStatusMock {
+
+ private List<BasicBSONObject> members = new ArrayList<BasicBSONObject>();
+
+ private Map<String, RevisionBuilder> memberRevisions = new HashMap<String, RevisionBuilder>();
+
+ private RevisionBuilder addInstance(MemberState state, String name) {
+ BasicBSONObject member = new BasicBSONObject();
+ member.put("stateStr", state.name());
+ member.put("name", name);
+ members.add(member);
+
+ RevisionBuilder builder = new RevisionBuilder();
+ memberRevisions.put(name, builder);
+ return builder;
+ }
+ }
+
+ private static class RevisionBuilder {
+
+ private RevisionVector revs = new RevisionVector();
+
+ private RevisionBuilder addRevisions(int... timestamps) {
+ for (int i = 0; i < timestamps.length; i++) {
+ addRevision(timestamps[i], 0, i, false);
+ }
+ return this;
+ }
+
+ private RevisionBuilder addRevision(int timestamp, int counter, int clusterId, boolean branch) {
+ Revision rev = new Revision(timestamp, counter, clusterId, branch);
+ revs = revs.update(rev);
+ return this;
+ }
+ }
+}
\ No newline at end of file