You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/05/07 16:39:46 UTC
svn commit: r1479926 - in /jackrabbit/oak/trunk/oak-mongomk/src:
main/java/org/apache/jackrabbit/mongomk/
main/java/org/apache/jackrabbit/mongomk/util/
test/java/org/apache/jackrabbit/mongomk/
Author: thomasm
Date: Tue May 7 14:39:45 2013
New Revision: 1479926
URL: http://svn.apache.org/r1479926
Log:
OAK-762 MongoMK: automatic unique cluster id / revision order depends on the cluster id (WIP)
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java Tue May 7 14:39:45 2013
@@ -62,7 +62,7 @@ class Branch {
return commits.contains(r);
}
- synchronized public void removeCommit(@Nonnull Revision rev) {
+ public synchronized void removeCommit(@Nonnull Revision rev) {
commits.remove(rev);
}
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java Tue May 7 14:39:45 2013
@@ -33,7 +33,7 @@ import static com.google.common.base.Pre
*/
class Collision {
- private static final Logger log = LoggerFactory.getLogger(Collision.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Collision.class);
private final Map<String, Object> document;
private final String theirRev;
@@ -87,7 +87,7 @@ class Collision {
// TODO: detect concurrent commit of previously un-merged changes
// TODO: check _commitRoot for revision is not 'true'
store.createOrUpdate(DocumentStore.Collection.NODES, op);
- log.debug("Marked collision on: {} for {} ({})",
+ LOG.debug("Marked collision on: {} for {} ({})",
new Object[]{commitRootPath, p, revision});
return true;
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java Tue May 7 14:39:45 2013
@@ -159,7 +159,9 @@ public class Commit {
}
ArrayList<UpdateOp> newNodes = new ArrayList<UpdateOp>();
ArrayList<UpdateOp> changedNodes = new ArrayList<UpdateOp>();
- ArrayList<UpdateOp> done = new ArrayList<UpdateOp>();
+ // operations are added to this list before they are executed,
+ // so that all operations can be rolled back if there is a conflict
+ ArrayList<UpdateOp> opLog = new ArrayList<UpdateOp>();
for (String p : operations.keySet()) {
markChanged(p);
if (commitRootPath == null) {
@@ -220,7 +222,7 @@ public class Commit {
for (UpdateOp op : changedNodes) {
// set commit root on changed nodes
op.setMapEntry(UpdateOp.COMMIT_ROOT, revision.toString(), commitRootDepth);
- done.add(op);
+ opLog.add(op);
createOrUpdateNode(store, op);
}
// finally write the commit root, unless it was already written
@@ -229,12 +231,12 @@ public class Commit {
// the revision, with the revision property set)
if (changedNodes.size() > 0 || !commitRoot.isNew) {
commitRoot.setMapEntry(UpdateOp.REVISIONS, revision.toString(), commitValue);
- done.add(commitRoot);
+ opLog.add(commitRoot);
createOrUpdateNode(store, commitRoot);
operations.put(commitRootPath, commitRoot);
}
} catch (MicroKernelException e) {
- rollback(newNodes, done);
+ rollback(newNodes, opLog);
String msg = "Exception committing " + diff.toString();
LOG.error(msg, e);
throw new MicroKernelException(msg, e);
@@ -273,26 +275,32 @@ public class Commit {
collisions.get().add(uncommitted);
}
});
+ MicroKernelException conflict = null;
if (newestRev == null) {
if (op.isDelete || !op.isNew) {
- throw new MicroKernelException("The node " +
+ conflict = new MicroKernelException("The node " +
op.path + " does not exist or is already deleted " +
"before " + revision + "; document " + map);
}
} else {
if (op.isNew) {
- throw new MicroKernelException("The node " +
+ conflict = new MicroKernelException("The node " +
op.path + " was already added in revision " +
newestRev + "; before " + revision + "; document " + map);
- }
- if (mk.isRevisionNewer(newestRev, baseRevision)
+ } else if (mk.isRevisionNewer(newestRev, baseRevision)
&& (op.isDelete || isConflicting(map, op))) {
- throw new MicroKernelException("The node " +
+ conflict = new MicroKernelException("The node " +
op.path + " was changed in revision " + newestRev +
", which was applied after the base revision " +
baseRevision + "; before " + revision + "; document " + map);
}
}
+ if (conflict != null) {
+ if (newestRev != null) {
+ mk.publishRevision(newestRev);
+ }
+ throw conflict;
+ }
// if we get here the modification was successful
// -> check for collisions and conflict (concurrent updates
// on a node are possible if property updates do not overlap)
@@ -486,10 +494,6 @@ public class Commit {
diff.tag('*').key(sourcePath).value(targetPath);
}
- public JsopWriter getDiff() {
- return diff;
- }
-
private void markChanged(String path) {
if (!PathUtils.denotesRoot(path) && !PathUtils.isAbsolute(path)) {
throw new IllegalArgumentException("path: " + path);
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java Tue May 7 14:39:45 2013
@@ -156,6 +156,14 @@ public interface DocumentStore {
void invalidateCache();
/**
+ * Invalidate the document cache for the given key.
+ *
+ * @param collection the collection
+ * @param key the key
+ */
+ void invalidateCache(Collection collection, String key);
+
+ /**
* Dispose this instance.
*/
void dispose();
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java Tue May 7 14:39:45 2013
@@ -301,4 +301,9 @@ public class MemoryDocumentStore impleme
return false;
}
+ @Override
+ public void invalidateCache(Collection collection, String key) {
+ // ignore
+ }
+
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java Tue May 7 14:39:45 2013
@@ -111,6 +111,13 @@ public class MongoDocumentStore implemen
public void invalidateCache() {
nodesCache.invalidateAll();
}
+
+ @Override
+ public void invalidateCache(Collection collection, String key) {
+ if (collection == Collection.NODES) {
+ nodesCache.invalidate(key);
+ }
+ }
public Map<String, Object> find(Collection collection, String key) {
return find(collection, key, Integer.MAX_VALUE);
@@ -123,6 +130,9 @@ public class MongoDocumentStore implemen
}
try {
CachedDocument doc;
+ if (maxCacheAge == 0) {
+ nodesCache.invalidate(key);
+ }
while (true) {
doc = nodesCache.get(key, new Callable<CachedDocument>() {
@Override
@@ -131,7 +141,7 @@ public class MongoDocumentStore implemen
return new CachedDocument(map);
}
});
- if (maxCacheAge == Integer.MAX_VALUE) {
+ if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
break;
}
if (System.currentTimeMillis() - doc.time < maxCacheAge) {
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java Tue May 7 14:39:45 2013
@@ -67,24 +67,34 @@ public class MongoMK implements MicroKer
/**
* The number of child node list entries to cache.
*/
- private static final int CACHE_CHILDREN = Integer.getInteger("oak.mongoMK.cacheChildren", 1024);
+ private static final int CACHE_CHILDREN =
+ Integer.getInteger("oak.mongoMK.cacheChildren", 1024);
/**
* The number of nodes to cache.
*/
- private static final int CACHE_NODES = Integer.getInteger("oak.mongoMK.cacheNodes", 1024);
+ private static final int CACHE_NODES =
+ Integer.getInteger("oak.mongoMK.cacheNodes", 1024);
/**
- * When trying to access revisions that are older than this many milliseconds, a warning is logged.
+ * When trying to access revisions that are older than this many
+ * milliseconds, a warning is logged. The default is one minute.
*/
- private static final int WARN_REVISION_AGE = Integer.getInteger("oak.mongoMK.revisionAge", 10000);
+ private static final int WARN_REVISION_AGE =
+ Integer.getInteger("oak.mongoMK.revisionAge", 60 * 1000);
/**
* Enable background operations
*/
private static final boolean ENABLE_BACKGROUND_OPS = Boolean.parseBoolean(
System.getProperty("oak.mongoMK.backgroundOps", "true"));
-
+
+ /**
+ * How long to remember the relative order of old revision of all cluster
+ * nodes, in milliseconds. The default is one hour.
+ */
+ private static final int REMEMBER_REVISION_ORDER_MILLIS = 60 * 60 * 1000;
+
/**
* The delay for asynchronous operations (delayed commit propagation and
* cache update).
@@ -170,7 +180,9 @@ public class MongoMK implements MicroKer
/**
* The comparator for revisions.
*/
- private final RevisionComparator revisionComparator = new RevisionComparator();
+ private final RevisionComparator revisionComparator;
+
+ private boolean stopBackground;
MongoMK(Builder builder) {
this.store = builder.getDocumentStore();
@@ -179,11 +191,16 @@ public class MongoMK implements MicroKer
cid = Integer.getInteger("oak.mongoMK.clusterId", cid);
if (cid == 0) {
clusterNodeInfo = ClusterNodeInfo.getInstance(store);
+ // TODO we should ensure revisions generated from now on
+ // are never "older" than revisions already in the repository for
+ // this cluster id
cid = clusterNodeInfo.getId();
} else {
clusterNodeInfo = null;
}
this.clusterId = cid;
+
+ this.revisionComparator = new RevisionComparator(clusterId);
this.asyncDelay = builder.getAsyncDelay();
//TODO Use size based weigher
@@ -196,6 +213,10 @@ public class MongoMK implements MicroKer
.build();
init();
+ // initial reading of the revisions of other cluster nodes
+ backgroundRead();
+ revisionComparator.add(headRevision, Revision.getCurrentTimestamp() + 1);
+ headRevision = newRevision();
LOG.info("Initialized MongoMK with clusterNodeId: {}", clusterId);
}
@@ -250,7 +271,7 @@ public class MongoMK implements MicroKer
// only when using timestamp
return;
}
- if (!ENABLE_BACKGROUND_OPS) {
+ if (!ENABLE_BACKGROUND_OPS || stopBackground) {
return;
}
synchronized (this) {
@@ -279,6 +300,8 @@ public class MongoMK implements MicroKer
@SuppressWarnings("unchecked")
Map<String, String> lastRevMap = (Map<String, String>) map.get(UpdateOp.LAST_REV);
+ boolean hasNewRevisions = false;
+ long timestamp = Revision.getCurrentTimestamp();
for (Entry<String, String> e : lastRevMap.entrySet()) {
int machineId = Integer.parseInt(e.getKey());
if (machineId == clusterId) {
@@ -286,19 +309,58 @@ public class MongoMK implements MicroKer
}
Revision r = Revision.fromString(e.getValue());
Revision last = lastKnownRevision.get(machineId);
-
if (last == null || r.compareRevisionTime(last) > 0) {
- // TODO invalidating the whole cache is not really needed,
- // instead only those children that are cached could be checked
-
- store.invalidateCache();
lastKnownRevision.put(machineId, r);
- // add a new revision, so that changes are visible
- headRevision = Revision.newRevision(clusterId);
+ hasNewRevisions = true;
+ revisionComparator.add(r, timestamp);
}
}
+ if (hasNewRevisions) {
+ // TODO invalidating the whole cache is not really needed,
+ // instead only those children that are cached could be checked
+ store.invalidateCache();
+ // add a new revision, so that changes are visible
+ Revision r = Revision.newRevision(clusterId);
+ // the latest revisions of the current cluster node
+ // happened before the latest revisions of other cluster nodes
+ revisionComparator.add(r, timestamp - 1);
+ // the head revision is after other revisions
+ headRevision = Revision.newRevision(clusterId);
+ }
+ revisionComparator.purge(timestamp - REMEMBER_REVISION_ORDER_MILLIS);
}
+ /**
+ * Ensure the revision visible from now on, possibly by updating the head
+ * revision, so that the changes that occurred are visible.
+ *
+ * @param revision the revision
+ */
+ void publishRevision(Revision revision) {
+ if (revisionComparator.compare(headRevision, revision) >= 0) {
+ // already visible
+ return;
+ }
+ int clusterNodeId = revision.getClusterId();
+ if (clusterNodeId == this.clusterId) {
+ return;
+ }
+ long timestamp = Revision.getCurrentTimestamp();
+ revisionComparator.add(revision, timestamp);
+ // TODO invalidating the whole cache is not really needed,
+ // but how to ensure we invalidate the right part of the cache?
+ // possibly simply wait for the background thread to pick
+ // up the changes, but this depends on how often this method is called
+ store.invalidateCache();
+ // add a new revision, so that changes are visible
+ headRevision = Revision.newRevision(clusterId);
+ // the latest revisions of the current cluster node
+ // happened before the latest revisions of other cluster nodes
+ revisionComparator.add(headRevision, timestamp - 1);
+ // the head revision is after other revisions
+ headRevision = Revision.newRevision(clusterId);
+ }
+
void backgroundWrite() {
if (unsavedLastRevisions.size() == 0) {
return;
@@ -318,6 +380,7 @@ public class MongoMK implements MicroKer
}
});
+
long now = Revision.getCurrentTimestamp();
for (String p : paths) {
Revision r = unsavedLastRevisions.get(p);
@@ -330,7 +393,6 @@ public class MongoMK implements MicroKer
if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
continue;
}
-
Commit commit = new Commit(this, null, r);
commit.touchNode(p);
store.createOrUpdate(DocumentStore.Collection.NODES, commit.getUpdateOperationForNode(p));
@@ -339,6 +401,11 @@ public class MongoMK implements MicroKer
}
public void dispose() {
+ // force background write (with asyncDelay > 0, the root wouldn't be written)
+ // TODO make this more obvious / explicit
+ // TODO tests should also work if this is not done
+ asyncDelay = 0;
+ runBackgroundOperations();
if (!isDisposed.getAndSet(true)) {
synchronized (isDisposed) {
isDisposed.notifyAll();
@@ -395,11 +462,9 @@ public class MongoMK implements MicroKer
// in same branch, include if the same revision or
// requestRevision is newer
return x.equals(requestRevision) || isRevisionNewer(requestRevision, x);
- } else {
- // not part of branch identified by requestedRevision
- return false;
}
-
+ // not part of branch identified by requestedRevision
+ return false;
}
// assert: x is not a branch commit
b = branches.getBranch(requestRevision);
@@ -409,14 +474,7 @@ public class MongoMK implements MicroKer
// was created
requestRevision = b.getBase();
}
- if (x.getClusterId() == this.clusterId &&
- requestRevision.getClusterId() == this.clusterId) {
- // both revisions were created by this cluster instance:
- // compare timestamps and counters
- return requestRevision.compareRevisionTime(x) >= 0;
- }
- // TODO currently we only compare the timestamps
- return requestRevision.compareRevisionTime(x) >= 0;
+ return revisionComparator.compare(requestRevision, x) >= 0;
}
/**
@@ -427,7 +485,6 @@ public class MongoMK implements MicroKer
* @return true if x is newer
*/
boolean isRevisionNewer(@Nonnull Revision x, @Nonnull Revision previous) {
- // TODO currently we only compare the timestamps
return revisionComparator.compare(x, previous) > 0;
}
@@ -763,6 +820,9 @@ public class MongoMK implements MicroKer
@Override
public boolean nodeExists(String path, String revisionId)
throws MicroKernelException {
+ if (!PathUtils.isAbsolute(path)) {
+ throw new MicroKernelException("Path is not absolute: " + path);
+ }
revisionId = revisionId != null ? revisionId : headRevision.toString();
Revision rev = Revision.fromString(stripBranchRevMarker(revisionId));
Node n = getNode(path, rev);
@@ -855,10 +915,8 @@ public class MongoMK implements MicroKer
String value;
if (t.matches(JsopReader.NULL)) {
value = null;
- commit.getDiff().tag('^').key(path).value(null);
} else {
value = t.readRawValue().trim();
- commit.getDiff().tag('^').key(path).value(value);
}
String p = PathUtils.getParentPath(path);
String propertyName = PathUtils.getName(path);
@@ -875,8 +933,7 @@ public class MongoMK implements MicroKer
}
if (!nodeExists(sourcePath, baseRevId)) {
throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
- }
- if (nodeExists(targetPath, baseRevId)) {
+ } else if (nodeExists(targetPath, baseRevId)) {
throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
}
commit.moveNode(sourcePath, targetPath);
@@ -893,6 +950,8 @@ public class MongoMK implements MicroKer
}
if (!nodeExists(sourcePath, baseRevId)) {
throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+ } else if (nodeExists(targetPath, baseRevId)) {
+ throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
}
commit.copyNode(sourcePath, targetPath);
copyNode(sourcePath, targetPath, baseRev, commit);
@@ -1002,9 +1061,9 @@ public class MongoMK implements MicroKer
}
/**
- * Get the latest revision where the node was alive at or before the
- * provided revision.
- *
+ * Get the earliest (oldest) revision where the node was alive at or before
+ * the provided revision, if the node was alive at the given revision.
+ *
* @param nodeMap the node map
* @param maxRev the maximum revision to return
* @return the earliest revision, or null if the node is deleted at the
@@ -1016,52 +1075,76 @@ public class MongoMK implements MicroKer
}
/**
- * Get the latest revision where the node was alive at or before the
- * provided revision.
- *
- * @param nodeMap the node map
- * @param maxRev the maximum revision to return
- * @param validRevisions the set of revisions already checked against
- * maxRev and considered valid.
- * @return the earliest revision, or null if the node is deleted at the
- * given revision
- */
+ * Get the earliest (oldest) revision where the node was alive at or before
+ * the provided revision, if the node was alive at the given revision.
+ *
+ * @param nodeMap the node map
+ * @param maxRev the maximum revision to return
+ * @param validRevisions the set of revisions already checked against maxRev
+ * and considered valid.
+ * @return the earliest revision, or null if the node is deleted at the
+ * given revision
+ */
private Revision getLiveRevision(Map<String, Object> nodeMap,
Revision maxRev, Set<Revision> validRevisions) {
@SuppressWarnings("unchecked")
Map<String, String> valueMap = (Map<String, String>) nodeMap
.get(UpdateOp.DELETED);
- Revision firstRev = null;
- String value = null;
if (valueMap == null) {
return null;
}
+ // first, search the newest deleted revision
+ Revision deletedRev = null;
if (valueMap instanceof TreeMap) {
// use descending keys (newest first) if map is sorted
valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
}
for (String r : valueMap.keySet()) {
+ String value = valueMap.get(r);
+ if (!"true".equals(value)) {
+ // only look at deleted revisions now
+ continue;
+ }
Revision propRev = Revision.fromString(r);
if (isRevisionNewer(propRev, maxRev)
|| !isValidRevision(propRev, maxRev, nodeMap, validRevisions)) {
continue;
}
- if (firstRev == null || isRevisionNewer(propRev, firstRev)) {
- firstRev = propRev;
- value = valueMap.get(r);
+ if (deletedRev == null || isRevisionNewer(propRev, deletedRev)) {
+ deletedRev = propRev;
}
}
- if ("true".equals(value)) {
- return null;
+ // now search the oldest non-deleted revision that is newer than the
+ // newest deleted revision
+ Revision liveRev = null;
+ for (String r : valueMap.keySet()) {
+ String value = valueMap.get(r);
+ if ("true".equals(value)) {
+ // ignore deleted revisions
+ continue;
+ }
+ Revision propRev = Revision.fromString(r);
+ if (deletedRev != null && isRevisionNewer(deletedRev, propRev)) {
+ // the node was deleted later on
+ continue;
+ }
+ if (isRevisionNewer(propRev, maxRev)
+ || !isValidRevision(propRev, maxRev, nodeMap, validRevisions)) {
+ continue;
+ }
+ if (liveRev == null || isRevisionNewer(liveRev, propRev)) {
+ liveRev = propRev;
+ }
}
- return firstRev;
+ return liveRev;
}
/**
* Get the revision of the latest change made to this node.
*
* @param nodeMap the document
- * @param before the returned value is guaranteed to be older than this revision
+ * @param readRevision the returned value is guaranteed to _not_ match this revision,
+ * but it might be in this branch
* @param onlyCommitted whether only committed changes should be considered
* @param handler the conflict handler, which is called for un-committed revisions
* preceding <code>before</code>.
@@ -1069,11 +1152,12 @@ public class MongoMK implements MicroKer
*/
@SuppressWarnings("unchecked")
@Nullable Revision getNewestRevision(Map<String, Object> nodeMap,
- Revision before, boolean onlyCommitted,
+ Revision except, boolean onlyCommitted,
CollisionHandler handler) {
if (nodeMap == null) {
return null;
}
+ // TODO remove "except"
SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
if (nodeMap.containsKey(UpdateOp.REVISIONS)) {
revisions.addAll(((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS)).keySet());
@@ -1090,9 +1174,9 @@ public class MongoMK implements MicroKer
for (String r : revisions) {
Revision propRev = Revision.fromString(r);
if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
- if (isRevisionNewer(before, propRev)) {
+ if (!propRev.equals(except)) {
if (onlyCommitted && !isValidRevision(
- propRev, before, nodeMap, new HashSet<Revision>())) {
+ propRev, except, nodeMap, new HashSet<Revision>())) {
handler.uncommittedModification(propRev);
} else {
newestRev = propRev;
@@ -1288,7 +1372,7 @@ public class MongoMK implements MicroKer
this.isDisposed = isDisposed;
}
public void run() {
- while (!isDisposed.get()) {
+ while (delay != 0 && !isDisposed.get()) {
synchronized (isDisposed) {
try {
isDisposed.wait(delay);
@@ -1418,4 +1502,8 @@ public class MongoMK implements MicroKer
public boolean isCached(String path) {
return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
}
+ public void stopBackground() {
+ stopBackground = true;
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java Tue May 7 14:39:45 2013
@@ -222,7 +222,19 @@ public class Revision {
* When comparing revisions that occurred before, the timestamp is ignored.
*/
private long oldestTimestamp;
+
+ /**
+ * The cluster node id of the current cluster node. Revisions
+ * from this cluster node that are newer than the newest range
+ * (new local revisions)
+ * are considered to be the newest revisions overall.
+ */
+ private final int currentClusterNodeId;
+ RevisionComparator(int currentClusterNodId) {
+ this.currentClusterNodeId = currentClusterNodId;
+ }
+
/**
* Forget the order of older revisions. After calling this method, when comparing
* revisions that happened before the given value, the timestamp order is used
@@ -287,7 +299,11 @@ public class Revision {
} else {
RevisionRange last = list.get(list.size() - 1);
if (last.timestamp == timestamp) {
- last.revision = r;
+ // replace existing
+ if (r.compareRevisionTime(last.revision) > 0) {
+ // but only if newer
+ last.revision = r;
+ }
return;
}
newList = new ArrayList<RevisionRange>(list);
@@ -313,13 +329,25 @@ public class Revision {
if (o1.getClusterId() == o2.getClusterId()) {
return o1.compareRevisionTime(o2);
}
- RevisionRange range1 = getRevisionRange(o1);
- RevisionRange range2 = getRevisionRange(o2);
- if (range1 == null || range2 == null) {
+ long range1 = getRevisionRangeTimestamp(o1);
+ long range2 = getRevisionRangeTimestamp(o2);
+ if (range1 == 0 || range2 == 0) {
return o1.compareRevisionTime(o2);
}
- if (range1.timestamp != range2.timestamp) {
- return range1.timestamp < range2.timestamp ? -1 : 1;
+ if (range1 != range2) {
+ return range1 < range2 ? -1 : 1;
+ }
+ if (range1 == Long.MAX_VALUE) {
+ // in this case, both must be Long.MAX_VALUE, otherwise
+ // the previous check would have been true; and additionally
+ // the revisions are from different cluster nodes
+ if (o1.getClusterId() == currentClusterNodeId) {
+ return 1;
+ } else if (o2.getClusterId() == currentClusterNodeId) {
+ return -1;
+ }
+ // both revisions are new revisions of other cluster nodes
+ // (in reality this doesn't actually happen I believe)
}
int result = o1.compareRevisionTime(o2);
if (result != 0) {
@@ -328,31 +356,59 @@ public class Revision {
return o1.getClusterId() < o2.getClusterId() ? -1 : 1;
}
- private RevisionRange getRevisionRange(Revision r) {
+ /**
+ * Get the timestamp from the revision range, if found. If no range was
+ * found for this cluster instance, or if the revision is older than the
+ * earliest range, then 0 is returned. If the revision is newer than the
+ * newest range for this cluster instance, then Long.MAX_VALUE is
+ * returned.
+ *
+ * @param r the revision
+ * @return the timestamp, 0 if not found,
+ * the timestamp plus 1 second for new local revisions;
+ * Long.MAX_VALUE for new non-local revisions (meaning 'in the future')
+ */
+ private long getRevisionRangeTimestamp(Revision r) {
List<RevisionRange> list = map.get(r.getClusterId());
if (list == null) {
- return null;
+ return 0;
}
// search from latest backward
// (binary search could be used, but we expect most queries
// at the end of the list)
+ long result = 0;
for (int i = list.size() - 1; i >= 0; i--) {
RevisionRange range = list.get(i);
- if (r.compareRevisionTime(range.revision) >= 0) {
- return range;
+ int compare = r.compareRevisionTime(range.revision);
+ if (compare > 0) {
+ if (i == list.size() - 1) {
+ // newer than the newest range
+ if (r.getClusterId() == currentClusterNodeId) {
+ // newer than all
+ return range.timestamp + 1000;
+ }
+ // happenes in the future (not visible yet)
+ return Long.MAX_VALUE;
+ }
+ break;
}
+ result = range.timestamp;
}
- return null;
+ return result;
}
public String toString() {
StringBuilder buff = new StringBuilder();
for (int clusterId : new TreeSet<Integer>(map.keySet())) {
- buff.append(clusterId).append(':');
+ int i = 0;
+ buff.append(clusterId).append(":");
for (RevisionRange r : map.get(clusterId)) {
- buff.append(' ').append(r);
+ if (i++ % 4 == 0) {
+ buff.append('\n');
+ }
+ buff.append(" ").append(r);
}
- buff.append("; ");
+ buff.append("\n");
}
return buff.toString();
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java Tue May 7 14:39:45 2013
@@ -51,7 +51,8 @@ class UnmergedBranches {
/**
* The revision comparator.
*/
- private final Revision.RevisionComparator comparator = new Revision.RevisionComparator();
+ // TODO use the same comparator as MongoMK?
+ private final Revision.RevisionComparator comparator = new Revision.RevisionComparator(0);
/**
* Initialize with un-merged branches from <code>store</code> for this
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java Tue May 7 14:39:45 2013
@@ -141,6 +141,17 @@ public class LoggingDocumentStoreWrapper
throw convert(e);
}
}
+
+ @Override
+ public void invalidateCache(Collection collection, String key) {
+ try {
+ logMethod("invalidateCache", collection, key);
+ store.invalidateCache(collection, key);
+ } catch (Exception e) {
+ logException(e);
+ throw convert(e);
+ }
+ }
@Override
public void dispose() {
@@ -209,4 +220,5 @@ public class LoggingDocumentStoreWrapper
}
LOG.info(message);
}
+
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java Tue May 7 14:39:45 2013
@@ -23,6 +23,7 @@ import org.apache.jackrabbit.mk.api.Micr
import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import com.mongodb.DB;
@@ -53,6 +54,29 @@ public class ClusterTest {
}
@Test
+ @Ignore
+ public void openCloseOpen() {
+ MemoryDocumentStore ds = new MemoryDocumentStore();
+ MemoryBlobStore bs = new MemoryBlobStore();
+ MongoMK.Builder builder;
+
+ builder = new MongoMK.Builder();
+ builder.setDocumentStore(ds).setBlobStore(bs);
+ MongoMK mk1 = builder.setClusterId(1).open();
+ mk1.commit("/", "+\"a\": {}", null, null);
+ mk1.commit("/", "-\"a\"", null, null);
+
+ builder = new MongoMK.Builder();
+ builder.setDocumentStore(ds).setBlobStore(bs);
+ MongoMK mk2 = builder.setClusterId(2).open();
+ mk2.commit("/", "+\"a\": {}", null, null);
+ mk2.commit("/", "-\"a\"", null, null);
+
+ mk1.dispose();
+ mk2.dispose();
+ }
+
+ @Test
public void clusterNodeId() {
MongoMK mk1 = createMK(0);
MongoMK mk2 = createMK(0);
@@ -132,6 +156,11 @@ public class ClusterTest {
} catch (MicroKernelException e) {
// expected
}
+ // now, after the conflict, both cluster nodes see the node
+ // (before the conflict, this isn't necessarily the case for mk2)
+ String n1 = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 10, null);
+ String n2 = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 10, null);
+ assertEquals(n1, n2);
mk1.dispose();
mk2.dispose();
@@ -145,13 +174,12 @@ public class ClusterTest {
String m2h;
m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
assertEquals("{\":childNodeCount\":0}", m2h);
+ String oldHead = mk2.getHeadRevision();
mk1.commit("/", "+\"test\":{}", null, null);
String m1h = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 1, null);
assertEquals("{\"test\":{},\":childNodeCount\":1}", m1h);
- m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
-
// not available yet...
assertEquals("{\":childNodeCount\":0}", m2h);
m2h = mk2.getNodes("/test", mk2.getHeadRevision(), 0, 0, 2, null);
@@ -162,7 +190,7 @@ public class ClusterTest {
if (mk1.getPendingWriteCount() > 0) {
continue;
}
- if (mk2.isCached("/")) {
+ if (mk2.getHeadRevision().equals(oldHead)) {
continue;
}
break;
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java Tue May 7 14:39:45 2013
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertEqu
public class ConcurrentConflictTest extends BaseMongoMKTest {
private static final boolean USE_LOGGER = true;
- private static final Logger log = LoggerFactory.getLogger(ConcurrentConflictTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ConcurrentConflictTest.class);
private static final int NUM_WRITERS = 3;
private static final int NUM_NODES = 10;
private static final int NUM_TRANSFERS_PER_THREAD = 10;
@@ -81,7 +81,7 @@ public class ConcurrentConflictTest exte
}
private void concurrentUpdates(final boolean useBranch) throws Exception {
- log.info("====== Start test =======");
+ LOG.info("====== Start test =======");
final AtomicInteger conflicts = new AtomicInteger();
final List<Exception> exceptions = Collections.synchronizedList(
new ArrayList<Exception>());
@@ -144,7 +144,7 @@ public class ConcurrentConflictTest exte
long value = (Long) entry.getValue().get("value");
jsop.append("^\"/node-").append(entry.getKey());
jsop.append("/value\":");
- if (value >= 20 && ! withdrawn) {
+ if (value >= 20 && !withdrawn) {
jsop.append(value - 20);
withdrawn = true;
} else {
@@ -189,9 +189,9 @@ public class ConcurrentConflictTest exte
}
}
- private void log(String msg) {
+ void log(String msg) {
if (USE_LOGGER) {
- log.info(msg);
+ LOG.info(msg);
} else {
synchronized (logBuffer) {
logBuffer.append(msg).append("\n");
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java Tue May 7 14:39:45 2013
@@ -21,16 +21,17 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Random;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
-import org.apache.jackrabbit.mk.core.MicroKernelImpl;
import org.apache.jackrabbit.mk.json.JsonObject;
import org.apache.jackrabbit.mk.json.JsopBuilder;
import org.apache.jackrabbit.mk.json.JsopTokenizer;
-import org.junit.Ignore;
import org.junit.Test;
import com.mongodb.DB;
@@ -49,43 +50,48 @@ public class RandomizedClusterTest {
private MemoryBlobStore bs;
private MongoMK[] mkList = new MongoMK[MK_COUNT];
- private MicroKernelImpl[] mkListGold = new MicroKernelImpl[MK_COUNT];
private String[] revList = new String[MK_COUNT];
- private String[] revListGold = new String[MK_COUNT];
+ @SuppressWarnings({ "unchecked", "cast" })
+ private HashSet<Integer>[] unseenChanges = (HashSet<Integer>[]) new HashSet[MK_COUNT];
+ private HashMap<Integer, List<Op>> changes = new HashMap<Integer, List<Op>>();
private int opId;
private int mkId;
private StringBuilder log;
+
+ /**
+ * The map of changes. Key: node name; value: the last operation that
+ * changed the node.
+ */
+ private HashMap<String, Integer> nodeChange = new HashMap<String, Integer>();
@Test
- @Ignore
public void addRemoveSet() throws Exception {
- MicroKernelImpl mkG = new MicroKernelImpl();
for (int i = 0; i < MK_COUNT; i++) {
+ unseenChanges[i] = new HashSet<Integer>();
mkList[i] = createMK(i);
revList[i] = mkList[i].getHeadRevision();
- mkListGold[i] = mkG;
- revListGold[i] = mkListGold[i].getHeadRevision();
}
HashMap<Integer, ClusterRev> revs =
new HashMap<Integer, ClusterRev>();
Random r = new Random(1);
int operations = 1000, nodeCount = 10;
- int propertyCount = 5, valueCount = 10;
+ int valueCount = 10;
int maxBackRev = 20;
log = new StringBuilder();
try {
int maskOk = 0, maskFail = 0;
int opCount = 6;
+ nodeChange.clear();
for (int i = 0; i < operations; i++) {
opId = i;
mkId = r.nextInt(mkList.length);
String node = "t" + r.nextInt(nodeCount);
String node2 = "t" + r.nextInt(nodeCount);
- String property = "p" + r.nextInt(propertyCount);
+ String property = "x";
String value = "" + r.nextInt(valueCount);
String diff;
int op = r.nextInt(opCount);
@@ -94,65 +100,131 @@ public class RandomizedClusterTest {
// there are enough nodes to operate on
op = 0;
}
- boolean result;
+ String result;
+ boolean conflictExpected;
switch(op) {
case 0:
diff = "+ \"" + node + "\": { \"" + property + "\": " + value + "}";
log(diff);
- result = commit(diff);
+ if (exists(node)) {
+ log("already exists");
+ result = null;
+ } else {
+ conflictExpected = isConflict(node);
+ result = commit(diff, conflictExpected);
+ if (result != null) {
+ changes.put(i, Arrays.asList(new Op(mkId, node, value)));
+ nodeChange.put(node, i);
+ }
+ }
break;
case 1:
diff = "- \"" + node + "\"";
log(diff);
- result = commit(diff);
+ if (exists(node)) {
+ conflictExpected = isConflict(node);
+ result = commit(diff, conflictExpected);
+ if (result != null) {
+ changes.put(i, Arrays.asList(new Op(mkId, node, null)));
+ nodeChange.put(node, i);
+ }
+ } else {
+ log("doesn't exist");
+ result = null;
+ }
break;
case 2:
diff = "^ \"" + node + "/" + property + "\": " + value;
log(diff);
- result = commit(diff);
+ if (exists(node)) {
+ conflictExpected = isConflict(node);
+ result = commit(diff, conflictExpected);
+ if (result != null) {
+ changes.put(i, Arrays.asList(new Op(mkId, node, value)));
+ nodeChange.put(node, i);
+ }
+ } else {
+ log("doesn't exist");
+ result = null;
+ }
break;
case 3:
diff = "> \"" + node + "\": \"" + node2 + "\"";
log(diff);
- result = commit(diff);
+ if (exists(node) && !exists(node2)) {
+ conflictExpected = isConflict(node) | isConflict(node2);
+ result = commit(diff, conflictExpected);
+ if (result != null) {
+ value = getValue(mkId, i, node);
+ changes.put(i, Arrays.asList(
+ new Op(mkId, node, null), new Op(mkId, node2, value)));
+ nodeChange.put(node, i);
+ nodeChange.put(node2, i);
+ }
+ } else {
+ log("source doesn't exist or target exists");
+ result = null;
+ }
break;
case 4:
- diff = "* \"" + node + "\": \"" + node2 + "\"";
- log(diff);
- result = commit(diff);
+ if (isConflict(node)) {
+ // the MicroKernelImpl would report a conflict
+ result = null;
+ } else {
+ diff = "* \"" + node + "\": \"" + node2 + "\"";
+ log(diff);
+ if (exists(node) && !exists(node2)) {
+ conflictExpected = isConflict(node2);
+ result = commit(diff, conflictExpected);
+ if (result != null) {
+ value = getValue(mkId, i, node);
+ changes.put(i, Arrays.asList(new Op(mkId, node2, value)));
+ nodeChange.put(node2, i);
+ }
+ } else {
+ log("source doesn't exist or target exists");
+ result = null;
+ }
+ }
break;
case 5:
- revList[mkId] = mkList[mkId].getHeadRevision();
- revListGold[mkId] = mkListGold[mkId].getHeadRevision();
+ log("sync/refresh");
+ syncAndRefreshAllClusterNodes();
+ // go to head revision
+ result = revList[mkId] = mkList[mkId].getHeadRevision();
// fake failure
- result = i % 2 == 0;
+ maskFail |= 1 << op;
break;
default:
fail();
- result = false;
+ result = null;
}
- if (result) {
- maskOk |= 1 << op;
- } else {
+ if (result == null) {
maskFail |= 1 << op;
+ log(" -> fail " + Integer.toBinaryString(maskFail));
+ } else {
+ maskOk |= 1 << op;
+ log(" -> " + result);
+ // all other cluster nodes didn't see this particular change yet
+ for (int j = 0; j < unseenChanges.length; j++) {
+ if (j != mkId) {
+ unseenChanges[j].add(i);
+ }
+ }
}
log("get " + node);
- get(node);
+ boolean x = get(i, node);
+ log("get " + node + " returns " + x);
log("get " + node2);
- get(node2);
+ x = get(i, node2);
+ log("get " + node2 + " returns " + x);
MongoMK mk = mkList[mkId];
- MicroKernelImpl mkGold = mkListGold[mkId];
ClusterRev cr = new ClusterRev();
cr.mkId = mkId;
cr.rev = mk.getHeadRevision();
- cr.revGold = mkGold.getHeadRevision();
revs.put(i, cr);
revs.remove(i - maxBackRev);
- int revId = i - r.nextInt(maxBackRev);
- cr = revs.get(revId);
- if (cr != null) {
- get(node, cr.revGold, cr.rev);
- }
+ log.append('\n');
}
if (Integer.bitCount(maskOk) != opCount) {
fail("Not all operations were at least once successful: " + Integer.toBinaryString(maskOk));
@@ -167,45 +239,91 @@ public class RandomizedClusterTest {
}
for (int i = 0; i < MK_COUNT; i++) {
mkList[i].dispose();
- mkListGold[i].dispose();
}
// System.out.println(log);
// System.out.println();
}
+ private String getValue(int clusterId, int maxOp, String nodeName) {
+ for (int i = maxOp; i >= 0; i--) {
+ List<Op> ops = changes.get(i);
+ if (ops != null) {
+ for (Op o : ops) {
+ if (o.clusterId != clusterId && unseenChanges[clusterId].contains(i)) {
+ continue;
+ }
+ if (o.nodeName.equals(nodeName)) {
+ return o.value;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private boolean isConflict(String node) {
+ Integer change = nodeChange.get(node);
+ if (change == null || !unseenChanges[mkId].contains(change)) {
+ return false;
+ }
+ return true;
+ }
+
private void log(String msg) {
msg = opId + ": [" + mkId + "] " + msg + "\n";
log.append(msg);
}
- private void get(String node) {
- String headGold = mkListGold[mkId].getHeadRevision();
+ private void syncClusterNode() {
for (int i = 0; i < mkList.length; i++) {
MongoMK mk = mkList[i];
mk.backgroundWrite();
}
MongoMK mk = mkList[mkId];
mk.backgroundRead();
- String head = mk.getHeadRevision();
- get(node, headGold, head);
+ }
+
+ private void syncAndRefreshAllClusterNodes() {
+ syncClusterNode();
+ for (int i = 0; i < mkList.length; i++) {
+ MongoMK mk = mkList[i];
+ mk.backgroundRead();
+ revList[i] = mk.getHeadRevision();
+ unseenChanges[i].clear();
+ }
+ log("sync");
+ }
+
+ private boolean get(int maxOp, String node) {
+ String head = revList[mkId];
+ return get(maxOp, node, head);
+ }
+
+ private boolean exists(String node) {
+ String head = revList[mkId];
+ MongoMK mk = mkList[mkId];
+ return mk.nodeExists("/" + node, head);
}
- private void get(String node, String headGold, String head) {
+ private boolean get(int maxOp, String node, String head) {
String p = "/" + node;
- MicroKernelImpl mkGold = mkListGold[mkId];
MongoMK mk = mkList[mkId];
- if (!mkGold.nodeExists(p, headGold)) {
- assertFalse(mk.nodeExists(p, head));
- return;
+ String value = getValue(mkId, maxOp, node);
+ if (value == null) {
+ assertFalse("path: " + p + " is supposed to not exist",
+ mk.nodeExists(p, head));
+ return false;
}
if (!mk.nodeExists(p, head)) {
- assertTrue("path: " + p, mk.nodeExists(p, head));
+ assertTrue("path: " + p + " is supposed to exist",
+ mk.nodeExists(p, head));
}
- String resultGold = mkGold.getNodes(p, headGold, 0, 0, Integer.MAX_VALUE, null);
+ String expected = "{\":childNodeCount\":0,\"x\":" + value + "}";
String result = mk.getNodes(p, head, 0, 0, Integer.MAX_VALUE, null);
- resultGold = normalize(resultGold);
+ expected = normalize(expected);
result = normalize(result);
- assertEquals(resultGold, result);
+ assertEquals(expected, result);
+ return true;
}
private static String normalize(String json) {
@@ -217,32 +335,45 @@ public class RandomizedClusterTest {
return w.toString();
}
- private boolean commit(String diff) {
+ private String commit(String diff, boolean conflictExpected) {
boolean ok = false;
- MicroKernelImpl mkGold = mkListGold[mkId];
- String revGold = revListGold[mkId];
MongoMK mk = mkList[mkId];
String rev = revList[mkId];
- try {
- mkGold.commit("/", diff, revGold, null);
+ String result = null;
+ String ex = null;
+ if (conflictExpected) {
+ ok = false;
+ ex = "conflict expected";
+ // afterwards, this cluster node should synchronize
+ unseenChanges[mkId].clear();
+ } else {
ok = true;
- } catch (MicroKernelException e) {
+ }
+ if (ok) {
+ result = mk.commit("/", diff, rev, null);
+ revList[mkId] = result;
+ } else {
// System.out.println("--> fail " + e.toString());
try {
mk.commit("/", diff, rev, null);
- fail("Should fail: " + diff + " with exception " + e);
+ fail("Should fail: " + diff + " with " + ex);
} catch (MicroKernelException e2) {
// expected
+ revList[mkId] = mk.getHeadRevision();
+ // it might have been not a conflict with another cluster node
+ // TODO test two cases: conflict with other cluster node
+ // (this should auto-synchronize until the given conflict)
+ // and conflict with a previous change that was already seen,
+ // which shouldn't synchronize
+ syncAndRefreshAllClusterNodes();
}
}
- if (ok) {
- mk.commit("/", diff, rev, null);
- }
- return ok;
+ return result;
}
private MongoMK createMK(int clusterId) {
MongoMK.Builder builder = new MongoMK.Builder();
+ builder.setAsyncDelay(0);
if (MONGO_DB) {
DB db = MongoUtils.getConnection().getDB();
MongoUtils.dropCollections(db);
@@ -264,7 +395,21 @@ public class RandomizedClusterTest {
*/
static class ClusterRev {
int mkId;
- String rev, revGold;
+ String rev;
+ }
+
+ /**
+ * An operation.
+ */
+ static class Op {
+ final int clusterId;
+ final String nodeName;
+ final String value;
+ public Op(int clusterId, String nodeName, String value) {
+ this.clusterId = clusterId;
+ this.nodeName = nodeName;
+ this.value = value;
+ }
}
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java Tue May 7 14:39:45 2013
@@ -108,7 +108,7 @@ public class RevisionTest {
@Test
public void revisionComparatorSimple() {
- RevisionComparator comp = new RevisionComparator();
+ RevisionComparator comp = new RevisionComparator(0);
Revision r1 = Revision.newRevision(0);
Revision r2 = Revision.newRevision(0);
assertEquals(r1.compareRevisionTime(r2), comp.compare(r1, r2));
@@ -119,7 +119,7 @@ public class RevisionTest {
@Test
public void revisionComparatorCluster() {
- RevisionComparator comp = new RevisionComparator();
+ RevisionComparator comp = new RevisionComparator(0);
Revision r1c1 = new Revision(0x110, 0, 1);
Revision r2c1 = new Revision(0x120, 0, 1);
@@ -138,12 +138,13 @@ public class RevisionTest {
comp.add(r2c2, 10);
assertEquals(
- "1: r120-0-1:20; " +
- "2: r200-0-2:10; ", comp.toString());
+ "1:\n r120-0-1:20\n" +
+ "2:\n r200-0-2:10\n", comp.toString());
assertEquals(1, comp.compare(r1c1, r1c2));
assertEquals(1, comp.compare(r2c1, r2c2));
- assertEquals(1, comp.compare(r3c1, r3c2));
+ // r3c2 is still "in the future"
+ assertEquals(-1, comp.compare(r3c1, r3c2));
// now we declare r3 of c1 to be before r3 of c2
// (with the same range timestamp,
@@ -152,8 +153,8 @@ public class RevisionTest {
comp.add(r3c2, 30);
assertEquals(
- "1: r120-0-1:20 r130-0-1:30; " +
- "2: r200-0-2:10 r300-0-2:30; ", comp.toString());
+ "1:\n r120-0-1:20 r130-0-1:30\n" +
+ "2:\n r200-0-2:10 r300-0-2:30\n", comp.toString());
assertEquals(1, comp.compare(r1c1, r1c2));
assertEquals(1, comp.compare(r2c1, r2c2));
@@ -166,18 +167,18 @@ public class RevisionTest {
// get rid of old timestamps
comp.purge(10);
assertEquals(
- "1: r120-0-1:20 r130-0-1:30; " +
- "2: r300-0-2:30; ", comp.toString());
+ "1:\n r120-0-1:20 r130-0-1:30\n" +
+ "2:\n r300-0-2:30\n", comp.toString());
comp.purge(20);
assertEquals(
- "1: r130-0-1:30; " +
- "2: r300-0-2:30; ", comp.toString());
+ "1:\n r130-0-1:30\n" +
+ "2:\n r300-0-2:30\n", comp.toString());
// update an entry
comp.add(new Revision(0x301, 1, 2), 30);
assertEquals(
- "1: r130-0-1:30; " +
- "2: r301-1-2:30; ", comp.toString());
+ "1:\n r130-0-1:30\n" +
+ "2:\n r301-1-2:30\n", comp.toString());
comp.purge(30);
assertEquals("", comp.toString());
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java Tue May 7 14:39:45 2013
@@ -72,7 +72,7 @@ public class SimpleTest {
@Test
public void addNodeGetNode() {
MongoMK mk = new MongoMK.Builder().open();
- Revision rev = mk.newRevision();
+ Revision rev = Revision.fromString(mk.getHeadRevision());
Node n = new Node("/test", rev);
n.setProperty("name", "Hello");
UpdateOp op = n.asOperation(true);