You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/05/07 16:39:46 UTC

svn commit: r1479926 - in /jackrabbit/oak/trunk/oak-mongomk/src: main/java/org/apache/jackrabbit/mongomk/ main/java/org/apache/jackrabbit/mongomk/util/ test/java/org/apache/jackrabbit/mongomk/

Author: thomasm
Date: Tue May  7 14:39:45 2013
New Revision: 1479926

URL: http://svn.apache.org/r1479926
Log:
OAK-762 MongoMK: automatic unique cluster id / revision order depends on the cluster id (WIP)

Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java Tue May  7 14:39:45 2013
@@ -62,7 +62,7 @@ class Branch {
         return commits.contains(r);
     }
 
-    synchronized public void removeCommit(@Nonnull Revision rev) {
+    public synchronized void removeCommit(@Nonnull Revision rev) {
         commits.remove(rev);
     }
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java Tue May  7 14:39:45 2013
@@ -33,7 +33,7 @@ import static com.google.common.base.Pre
  */
 class Collision {
 
-    private static final Logger log = LoggerFactory.getLogger(Collision.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Collision.class);
 
     private final Map<String, Object> document;
     private final String theirRev;
@@ -87,7 +87,7 @@ class Collision {
                 // TODO: detect concurrent commit of previously un-merged changes
                 // TODO: check _commitRoot for revision is not 'true'
                 store.createOrUpdate(DocumentStore.Collection.NODES, op);
-                log.debug("Marked collision on: {} for {} ({})",
+                LOG.debug("Marked collision on: {} for {} ({})",
                         new Object[]{commitRootPath, p, revision});
                 return true;
             }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java Tue May  7 14:39:45 2013
@@ -159,7 +159,9 @@ public class Commit {
         }
         ArrayList<UpdateOp> newNodes = new ArrayList<UpdateOp>();
         ArrayList<UpdateOp> changedNodes = new ArrayList<UpdateOp>();
-        ArrayList<UpdateOp> done = new ArrayList<UpdateOp>();
+        // operations are added to this list before they are executed,
+        // so that all operations can be rolled back if there is a conflict
+        ArrayList<UpdateOp> opLog = new ArrayList<UpdateOp>();
         for (String p : operations.keySet()) {
             markChanged(p);
             if (commitRootPath == null) {
@@ -220,7 +222,7 @@ public class Commit {
             for (UpdateOp op : changedNodes) {
                 // set commit root on changed nodes
                 op.setMapEntry(UpdateOp.COMMIT_ROOT, revision.toString(), commitRootDepth);
-                done.add(op);
+                opLog.add(op);
                 createOrUpdateNode(store, op);
             }
             // finally write the commit root, unless it was already written
@@ -229,12 +231,12 @@ public class Commit {
             // the revision, with the revision property set)
             if (changedNodes.size() > 0 || !commitRoot.isNew) {
                 commitRoot.setMapEntry(UpdateOp.REVISIONS, revision.toString(), commitValue);
-                done.add(commitRoot);
+                opLog.add(commitRoot);
                 createOrUpdateNode(store, commitRoot);
                 operations.put(commitRootPath, commitRoot);
             }
         } catch (MicroKernelException e) {
-            rollback(newNodes, done);
+            rollback(newNodes, opLog);
             String msg = "Exception committing " + diff.toString();
             LOG.error(msg, e);
             throw new MicroKernelException(msg, e);
@@ -273,26 +275,32 @@ public class Commit {
                     collisions.get().add(uncommitted);
                 }
             });
+            MicroKernelException conflict = null;
             if (newestRev == null) {
                 if (op.isDelete || !op.isNew) {
-                    throw new MicroKernelException("The node " + 
+                    conflict = new MicroKernelException("The node " + 
                             op.path + " does not exist or is already deleted " + 
                             "before " + revision + "; document " + map);
                 }
             } else {
                 if (op.isNew) {
-                    throw new MicroKernelException("The node " + 
+                    conflict = new MicroKernelException("The node " + 
                             op.path + " was already added in revision " + 
                             newestRev + "; before " + revision + "; document " + map);
-                }
-                if (mk.isRevisionNewer(newestRev, baseRevision)
+                } else if (mk.isRevisionNewer(newestRev, baseRevision)
                         && (op.isDelete || isConflicting(map, op))) {
-                    throw new MicroKernelException("The node " + 
+                    conflict = new MicroKernelException("The node " + 
                             op.path + " was changed in revision " + newestRev +
                             ", which was applied after the base revision " + 
                             baseRevision + "; before " + revision + "; document " + map);
                 }
             }
+            if (conflict != null) {
+                if (newestRev != null) {
+                    mk.publishRevision(newestRev);
+                }
+                throw conflict;
+            }
             // if we get here the modification was successful
             // -> check for collisions and conflict (concurrent updates
             // on a node are possible if property updates do not overlap)
@@ -486,10 +494,6 @@ public class Commit {
         diff.tag('*').key(sourcePath).value(targetPath);
     }
 
-    public JsopWriter getDiff() {
-        return diff;
-    }
-
     private void markChanged(String path) {
         if (!PathUtils.denotesRoot(path) && !PathUtils.isAbsolute(path)) {
             throw new IllegalArgumentException("path: " + path);

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java Tue May  7 14:39:45 2013
@@ -156,6 +156,14 @@ public interface DocumentStore {
     void invalidateCache();
 
     /**
+     * Invalidate the document cache for the given key.
+     * 
+     * @param collection the collection
+     * @param key the key
+     */
+    void invalidateCache(Collection collection, String key);
+
+    /**
      * Dispose this instance.
      */
     void dispose();

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java Tue May  7 14:39:45 2013
@@ -301,4 +301,9 @@ public class MemoryDocumentStore impleme
         return false;
     }
 
+    @Override
+    public void invalidateCache(Collection collection, String key) {
+        // ignore
+    }
+
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java Tue May  7 14:39:45 2013
@@ -111,6 +111,13 @@ public class MongoDocumentStore implemen
     public void invalidateCache() {
         nodesCache.invalidateAll();
     }
+    
+    @Override
+    public void invalidateCache(Collection collection, String key) {
+        if (collection == Collection.NODES) {
+            nodesCache.invalidate(key);
+        }
+    }
 
     public Map<String, Object> find(Collection collection, String key) {
         return find(collection, key, Integer.MAX_VALUE);
@@ -123,6 +130,9 @@ public class MongoDocumentStore implemen
         }
         try {
             CachedDocument doc;
+            if (maxCacheAge == 0) {
+                nodesCache.invalidate(key);
+            }
             while (true) {
                 doc = nodesCache.get(key, new Callable<CachedDocument>() {
                     @Override
@@ -131,7 +141,7 @@ public class MongoDocumentStore implemen
                         return new CachedDocument(map);
                     }
                 });
-                if (maxCacheAge == Integer.MAX_VALUE) {
+                if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
                     break;
                 }
                 if (System.currentTimeMillis() - doc.time < maxCacheAge) {

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java Tue May  7 14:39:45 2013
@@ -67,24 +67,34 @@ public class MongoMK implements MicroKer
     /**
      * The number of child node list entries to cache.
      */
-    private static final int CACHE_CHILDREN = Integer.getInteger("oak.mongoMK.cacheChildren", 1024);
+    private static final int CACHE_CHILDREN = 
+            Integer.getInteger("oak.mongoMK.cacheChildren", 1024);
     
     /**
      * The number of nodes to cache.
      */
-    private static final int CACHE_NODES = Integer.getInteger("oak.mongoMK.cacheNodes", 1024);
+    private static final int CACHE_NODES = 
+            Integer.getInteger("oak.mongoMK.cacheNodes", 1024);
     
     /**
-     * When trying to access revisions that are older than this many milliseconds, a warning is logged.
+     * When trying to access revisions that are older than this many
+     * milliseconds, a warning is logged. The default is one minute.
      */
-    private static final int WARN_REVISION_AGE = Integer.getInteger("oak.mongoMK.revisionAge", 10000);
+    private static final int WARN_REVISION_AGE = 
+            Integer.getInteger("oak.mongoMK.revisionAge", 60 * 1000);
 
     /**
      * Enable background operations
      */
     private static final boolean ENABLE_BACKGROUND_OPS = Boolean.parseBoolean(
             System.getProperty("oak.mongoMK.backgroundOps", "true"));
-    
+
+    /**
+     * How long to remember the relative order of old revision of all cluster
+     * nodes, in milliseconds. The default is one hour.
+     */
+    private static final int REMEMBER_REVISION_ORDER_MILLIS = 60 * 60 * 1000;
+
     /**
      * The delay for asynchronous operations (delayed commit propagation and
      * cache update).
@@ -170,7 +180,9 @@ public class MongoMK implements MicroKer
     /**
      * The comparator for revisions.
      */
-    private final RevisionComparator revisionComparator = new RevisionComparator();
+    private final RevisionComparator revisionComparator;
+    
+    private boolean stopBackground;
     
     MongoMK(Builder builder) {
         this.store = builder.getDocumentStore();
@@ -179,11 +191,16 @@ public class MongoMK implements MicroKer
         cid = Integer.getInteger("oak.mongoMK.clusterId", cid);
         if (cid == 0) {
             clusterNodeInfo = ClusterNodeInfo.getInstance(store);
+            // TODO we should ensure revisions generated from now on
+            // are never "older" than revisions already in the repository for
+            // this cluster id
             cid = clusterNodeInfo.getId();
         } else {
             clusterNodeInfo = null;
         }
         this.clusterId = cid;
+        
+        this.revisionComparator = new RevisionComparator(clusterId);
         this.asyncDelay = builder.getAsyncDelay();
 
         //TODO Use size based weigher
@@ -196,6 +213,10 @@ public class MongoMK implements MicroKer
                         .build();
         
         init();
+        // initial reading of the revisions of other cluster nodes
+        backgroundRead();
+        revisionComparator.add(headRevision, Revision.getCurrentTimestamp() + 1);
+        headRevision = newRevision();
         LOG.info("Initialized MongoMK with clusterNodeId: {}", clusterId);
     }
     
@@ -250,7 +271,7 @@ public class MongoMK implements MicroKer
             // only when using timestamp
             return;
         }
-        if (!ENABLE_BACKGROUND_OPS) {
+        if (!ENABLE_BACKGROUND_OPS || stopBackground) {
             return;
         }
         synchronized (this) {
@@ -279,6 +300,8 @@ public class MongoMK implements MicroKer
         @SuppressWarnings("unchecked")
         Map<String, String> lastRevMap = (Map<String, String>) map.get(UpdateOp.LAST_REV);
         
+        boolean hasNewRevisions = false;
+        long timestamp = Revision.getCurrentTimestamp();
         for (Entry<String, String> e : lastRevMap.entrySet()) {
             int machineId = Integer.parseInt(e.getKey());
             if (machineId == clusterId) {
@@ -286,19 +309,58 @@ public class MongoMK implements MicroKer
             }
             Revision r = Revision.fromString(e.getValue());
             Revision last = lastKnownRevision.get(machineId);
-            
             if (last == null || r.compareRevisionTime(last) > 0) {
-                // TODO invalidating the whole cache is not really needed,
-                // instead only those children that are cached could be checked
-                
-                store.invalidateCache();
                 lastKnownRevision.put(machineId, r);
-                // add a new revision, so that changes are visible
-                headRevision = Revision.newRevision(clusterId);
+                hasNewRevisions = true;
+                revisionComparator.add(r, timestamp);
             }
         }
+        if (hasNewRevisions) {
+            // TODO invalidating the whole cache is not really needed,
+            // instead only those children that are cached could be checked
+            store.invalidateCache();
+            // add a new revision, so that changes are visible
+            Revision r = Revision.newRevision(clusterId);
+            // the latest revisions of the current cluster node
+            // happened before the latest revisions of other cluster nodes
+            revisionComparator.add(r, timestamp - 1);
+            // the head revision is after other revisions
+            headRevision = Revision.newRevision(clusterId);
+        }
+        revisionComparator.purge(timestamp - REMEMBER_REVISION_ORDER_MILLIS);
     }
     
+    /**
+     * Ensure the revision visible from now on, possibly by updating the head
+     * revision, so that the changes that occurred are visible.
+     * 
+     * @param revision the revision
+     */
+    void publishRevision(Revision revision) {  
+        if (revisionComparator.compare(headRevision, revision) >= 0) {
+            // already visible
+            return;
+        }
+        int clusterNodeId = revision.getClusterId();
+        if (clusterNodeId == this.clusterId) {
+            return;
+        }
+        long timestamp = Revision.getCurrentTimestamp();
+        revisionComparator.add(revision, timestamp);
+        // TODO invalidating the whole cache is not really needed,
+        // but how to ensure we invalidate the right part of the cache?
+        // possibly simply wait for the background thread to pick
+        // up the changes, but this depends on how often this method is called
+        store.invalidateCache();
+        // add a new revision, so that changes are visible
+        headRevision = Revision.newRevision(clusterId);
+        // the latest revisions of the current cluster node
+        // happened before the latest revisions of other cluster nodes
+        revisionComparator.add(headRevision, timestamp - 1);
+        // the head revision is after other revisions
+        headRevision = Revision.newRevision(clusterId);
+    }
+
     void backgroundWrite() {
         if (unsavedLastRevisions.size() == 0) {
             return;
@@ -318,6 +380,7 @@ public class MongoMK implements MicroKer
             }
 
         });
+        
         long now = Revision.getCurrentTimestamp();
         for (String p : paths) {
             Revision r = unsavedLastRevisions.get(p);
@@ -330,7 +393,6 @@ public class MongoMK implements MicroKer
             if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
                 continue;
             }
-            
             Commit commit = new Commit(this, null, r);
             commit.touchNode(p);
             store.createOrUpdate(DocumentStore.Collection.NODES, commit.getUpdateOperationForNode(p));
@@ -339,6 +401,11 @@ public class MongoMK implements MicroKer
     }
     
     public void dispose() {
+        // force background write (with asyncDelay > 0, the root wouldn't be written)
+        // TODO make this more obvious / explicit
+        // TODO tests should also work if this is not done
+        asyncDelay = 0;
+        runBackgroundOperations();
         if (!isDisposed.getAndSet(true)) {
             synchronized (isDisposed) {
                 isDisposed.notifyAll();
@@ -395,11 +462,9 @@ public class MongoMK implements MicroKer
                 // in same branch, include if the same revision or
                 // requestRevision is newer
                 return x.equals(requestRevision) || isRevisionNewer(requestRevision, x);
-            } else {
-                // not part of branch identified by requestedRevision
-                return false;
             }
-
+            // not part of branch identified by requestedRevision
+            return false;
         }
         // assert: x is not a branch commit
         b = branches.getBranch(requestRevision);
@@ -409,14 +474,7 @@ public class MongoMK implements MicroKer
             // was created
             requestRevision = b.getBase();
         }
-        if (x.getClusterId() == this.clusterId && 
-                requestRevision.getClusterId() == this.clusterId) {
-            // both revisions were created by this cluster instance: 
-            // compare timestamps and counters
-            return requestRevision.compareRevisionTime(x) >= 0;
-        }
-        // TODO currently we only compare the timestamps
-        return requestRevision.compareRevisionTime(x) >= 0;
+        return revisionComparator.compare(requestRevision, x) >= 0;
     }
     
     /**
@@ -427,7 +485,6 @@ public class MongoMK implements MicroKer
      * @return true if x is newer
      */
     boolean isRevisionNewer(@Nonnull Revision x, @Nonnull Revision previous) {
-        // TODO currently we only compare the timestamps
         return revisionComparator.compare(x, previous) > 0;
     }
 
@@ -763,6 +820,9 @@ public class MongoMK implements MicroKer
     @Override
     public boolean nodeExists(String path, String revisionId)
             throws MicroKernelException {
+        if (!PathUtils.isAbsolute(path)) {
+            throw new MicroKernelException("Path is not absolute: " + path);
+        }
         revisionId = revisionId != null ? revisionId : headRevision.toString();
         Revision rev = Revision.fromString(stripBranchRevMarker(revisionId));
         Node n = getNode(path, rev);
@@ -855,10 +915,8 @@ public class MongoMK implements MicroKer
                 String value;
                 if (t.matches(JsopReader.NULL)) {
                     value = null;
-                    commit.getDiff().tag('^').key(path).value(null);
                 } else {
                     value = t.readRawValue().trim();
-                    commit.getDiff().tag('^').key(path).value(value);
                 }
                 String p = PathUtils.getParentPath(path);
                 String propertyName = PathUtils.getName(path);
@@ -875,8 +933,7 @@ public class MongoMK implements MicroKer
                 }
                 if (!nodeExists(sourcePath, baseRevId)) {
                     throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
-                }
-                if (nodeExists(targetPath, baseRevId)) {
+                } else if (nodeExists(targetPath, baseRevId)) {
                     throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
                 }
                 commit.moveNode(sourcePath, targetPath);
@@ -893,6 +950,8 @@ public class MongoMK implements MicroKer
                 }
                 if (!nodeExists(sourcePath, baseRevId)) {
                     throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+                } else if (nodeExists(targetPath, baseRevId)) {
+                    throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
                 }
                 commit.copyNode(sourcePath, targetPath);
                 copyNode(sourcePath, targetPath, baseRev, commit);
@@ -1002,9 +1061,9 @@ public class MongoMK implements MicroKer
     }
 
     /**
-     * Get the latest revision where the node was alive at or before the
-     * provided revision.
-     *
+     * Get the earliest (oldest) revision where the node was alive at or before
+     * the provided revision, if the node was alive at the given revision.
+     * 
      * @param nodeMap the node map
      * @param maxRev the maximum revision to return
      * @return the earliest revision, or null if the node is deleted at the
@@ -1016,52 +1075,76 @@ public class MongoMK implements MicroKer
     }
 
     /**
-    * Get the latest revision where the node was alive at or before the
-    * provided revision.
-    *
-    * @param nodeMap the node map
-    * @param maxRev the maximum revision to return
-    * @param validRevisions the set of revisions already checked against
-     *                      maxRev and considered valid.
-    * @return the earliest revision, or null if the node is deleted at the
-    *         given revision
-    */
+     * Get the earliest (oldest) revision where the node was alive at or before
+     * the provided revision, if the node was alive at the given revision.
+     * 
+     * @param nodeMap the node map
+     * @param maxRev the maximum revision to return
+     * @param validRevisions the set of revisions already checked against maxRev
+     *            and considered valid.
+     * @return the earliest revision, or null if the node is deleted at the
+     *         given revision
+     */
     private Revision getLiveRevision(Map<String, Object> nodeMap,
             Revision maxRev, Set<Revision> validRevisions) {
         @SuppressWarnings("unchecked")
         Map<String, String> valueMap = (Map<String, String>) nodeMap
                 .get(UpdateOp.DELETED);
-        Revision firstRev = null;
-        String value = null;
         if (valueMap == null) {
             return null;
         }
+        // first, search the newest deleted revision
+        Revision deletedRev = null;
         if (valueMap instanceof TreeMap) {
             // use descending keys (newest first) if map is sorted
             valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
         }
         for (String r : valueMap.keySet()) {
+            String value = valueMap.get(r);
+            if (!"true".equals(value)) {
+                // only look at deleted revisions now
+                continue;
+            }
             Revision propRev = Revision.fromString(r);
             if (isRevisionNewer(propRev, maxRev)
                     || !isValidRevision(propRev, maxRev, nodeMap, validRevisions)) {
                 continue;
             }
-            if (firstRev == null || isRevisionNewer(propRev, firstRev)) {
-                firstRev = propRev;
-                value = valueMap.get(r);
+            if (deletedRev == null || isRevisionNewer(propRev, deletedRev)) {
+                deletedRev = propRev;
             }
         }
-        if ("true".equals(value)) {
-            return null;
+        // now search the oldest non-deleted revision that is newer than the
+        // newest deleted revision
+        Revision liveRev = null;
+        for (String r : valueMap.keySet()) {
+            String value = valueMap.get(r);
+            if ("true".equals(value)) {
+                // ignore deleted revisions
+                continue;
+            }
+            Revision propRev = Revision.fromString(r);
+            if (deletedRev != null && isRevisionNewer(deletedRev, propRev)) {
+                // the node was deleted later on
+                continue;
+            }
+            if (isRevisionNewer(propRev, maxRev)
+                    || !isValidRevision(propRev, maxRev, nodeMap, validRevisions)) {
+                continue;
+            }
+            if (liveRev == null || isRevisionNewer(liveRev, propRev)) {
+                liveRev = propRev;
+            }
         }
-        return firstRev;
+        return liveRev;
     }
     
     /**
      * Get the revision of the latest change made to this node.
      * 
      * @param nodeMap the document
-     * @param before the returned value is guaranteed to be older than this revision
+     * @param readRevision the returned value is guaranteed to _not_ match this revision,
+     *              but it might be in this branch
      * @param onlyCommitted whether only committed changes should be considered
      * @param handler the conflict handler, which is called for un-committed revisions
      *                preceding <code>before</code>.
@@ -1069,11 +1152,12 @@ public class MongoMK implements MicroKer
      */
     @SuppressWarnings("unchecked")
     @Nullable Revision getNewestRevision(Map<String, Object> nodeMap,
-                                         Revision before, boolean onlyCommitted,
+                                         Revision except, boolean onlyCommitted,
                                          CollisionHandler handler) {
         if (nodeMap == null) {
             return null;
         }
+        // TODO remove "except"
         SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
         if (nodeMap.containsKey(UpdateOp.REVISIONS)) {
             revisions.addAll(((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS)).keySet());
@@ -1090,9 +1174,9 @@ public class MongoMK implements MicroKer
         for (String r : revisions) {
             Revision propRev = Revision.fromString(r);
             if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
-                if (isRevisionNewer(before, propRev)) {
+                if (!propRev.equals(except)) {
                     if (onlyCommitted && !isValidRevision(
-                            propRev, before, nodeMap, new HashSet<Revision>())) {
+                            propRev, except, nodeMap, new HashSet<Revision>())) {
                         handler.uncommittedModification(propRev);
                     } else {
                         newestRev = propRev;
@@ -1288,7 +1372,7 @@ public class MongoMK implements MicroKer
             this.isDisposed = isDisposed;
         }
         public void run() {
-            while (!isDisposed.get()) {
+            while (delay != 0 && !isDisposed.get()) {
                 synchronized (isDisposed) {
                     try {
                         isDisposed.wait(delay);
@@ -1418,4 +1502,8 @@ public class MongoMK implements MicroKer
     public boolean isCached(String path) {
         return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
     }
+    public void stopBackground() {
+        stopBackground = true;
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java Tue May  7 14:39:45 2013
@@ -222,7 +222,19 @@ public class Revision {
          * When comparing revisions that occurred before, the timestamp is ignored.
          */
         private long oldestTimestamp;
+        
+        /**
+         * The cluster node id of the current cluster node. Revisions 
+         * from this cluster node that are newer than the newest range
+         * (new local revisions) 
+         * are considered to be the newest revisions overall.
+         */
+        private final int currentClusterNodeId;
 
+        RevisionComparator(int currentClusterNodId) {
+            this.currentClusterNodeId = currentClusterNodId;
+        }
+        
         /**
          * Forget the order of older revisions. After calling this method, when comparing
          * revisions that happened before the given value, the timestamp order is used
@@ -287,7 +299,11 @@ public class Revision {
                 } else {
                     RevisionRange last = list.get(list.size() - 1);
                     if (last.timestamp == timestamp) {
-                        last.revision = r;
+                        // replace existing
+                        if (r.compareRevisionTime(last.revision) > 0) {
+                            // but only if newer
+                            last.revision = r;
+                        }
                         return;
                     }
                     newList = new ArrayList<RevisionRange>(list);
@@ -313,13 +329,25 @@ public class Revision {
             if (o1.getClusterId() == o2.getClusterId()) {
                 return o1.compareRevisionTime(o2);
             }
-            RevisionRange range1 = getRevisionRange(o1);
-            RevisionRange range2 = getRevisionRange(o2);
-            if (range1 == null || range2 == null) {
+            long range1 = getRevisionRangeTimestamp(o1);
+            long range2 = getRevisionRangeTimestamp(o2);
+            if (range1 == 0 || range2 == 0) {
                 return o1.compareRevisionTime(o2);
             }
-            if (range1.timestamp != range2.timestamp) {
-                return range1.timestamp < range2.timestamp ? -1 : 1;
+            if (range1 != range2) {
+                return range1 < range2 ? -1 : 1;
+            }
+            if (range1 == Long.MAX_VALUE) {
+                // in this case, both must be Long.MAX_VALUE, otherwise
+                // the previous check would have been true; and additionally
+                // the revisions are from different cluster nodes
+                if (o1.getClusterId() == currentClusterNodeId) {
+                    return 1;
+                } else if (o2.getClusterId() == currentClusterNodeId) {
+                    return -1;
+                }
+                // both revisions are new revisions of other cluster nodes
+                // (in reality this doesn't actually happen I believe)
             }
             int result = o1.compareRevisionTime(o2);
             if (result != 0) {
@@ -328,31 +356,59 @@ public class Revision {
             return o1.getClusterId() < o2.getClusterId() ? -1 : 1;
         }
         
-        private RevisionRange getRevisionRange(Revision r) {
+        /**
+         * Get the timestamp from the revision range, if found. If no range was
+         * found for this cluster instance, or if the revision is older than the
+         * earliest range, then 0 is returned. If the revision is newer than the
+         * newest range for this cluster instance, then Long.MAX_VALUE is
+         * returned.
+         * 
+         * @param r the revision
+         * @return the timestamp, 0 if not found, 
+         *      the timestamp plus 1 second for new local revisions;
+         *      Long.MAX_VALUE for new non-local revisions (meaning 'in the future')
+         */
+        private long getRevisionRangeTimestamp(Revision r) {
             List<RevisionRange> list = map.get(r.getClusterId());
             if (list == null) {
-                return null;
+                return 0;
             }
             // search from latest backward
             // (binary search could be used, but we expect most queries
             // at the end of the list)
+            long result = 0;
             for (int i = list.size() - 1; i >= 0; i--) {
                 RevisionRange range = list.get(i);
-                if (r.compareRevisionTime(range.revision) >= 0) {
-                    return range;
+                int compare = r.compareRevisionTime(range.revision);
+                if (compare > 0) {
+                    if (i == list.size() - 1) {
+                        // newer than the newest range
+                        if (r.getClusterId() == currentClusterNodeId) {
+                            // newer than all 
+                            return range.timestamp + 1000;
+                        }
+                        // happenes in the future (not visible yet)
+                        return Long.MAX_VALUE;
+                    }
+                    break;
                 }
+                result = range.timestamp;
             }
-            return null;
+            return result;
         }
         
         public String toString() {
             StringBuilder buff = new StringBuilder();
             for (int clusterId : new TreeSet<Integer>(map.keySet())) {
-                buff.append(clusterId).append(':');
+                int i = 0;
+                buff.append(clusterId).append(":");
                 for (RevisionRange r : map.get(clusterId)) {
-                    buff.append(' ').append(r);
+                    if (i++ % 4 == 0) {
+                        buff.append('\n');
+                    }
+                    buff.append(" ").append(r);
                 }
-                buff.append("; ");
+                buff.append("\n");
             }
             return buff.toString();
         }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java Tue May  7 14:39:45 2013
@@ -51,7 +51,8 @@ class UnmergedBranches {
     /**
      * The revision comparator.
      */
-    private final Revision.RevisionComparator comparator = new Revision.RevisionComparator();
+    // TODO use the same comparator as MongoMK?
+    private final Revision.RevisionComparator comparator = new Revision.RevisionComparator(0);
 
     /**
      * Initialize with un-merged branches from <code>store</code> for this

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java Tue May  7 14:39:45 2013
@@ -141,6 +141,17 @@ public class LoggingDocumentStoreWrapper
             throw convert(e);
         }
     }
+    
+    @Override
+    public void invalidateCache(Collection collection, String key) {
+        try {
+            logMethod("invalidateCache", collection, key);
+            store.invalidateCache(collection, key);
+        } catch (Exception e) {
+            logException(e);
+            throw convert(e);
+        }
+    }
 
     @Override
     public void dispose() {
@@ -209,4 +220,5 @@ public class LoggingDocumentStoreWrapper
         }
         LOG.info(message);
     }
+
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ClusterTest.java Tue May  7 14:39:45 2013
@@ -23,6 +23,7 @@ import org.apache.jackrabbit.mk.api.Micr
 import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.mongodb.DB;
@@ -53,6 +54,29 @@ public class ClusterTest {
     }
     
     @Test
+    @Ignore
+    public void openCloseOpen() {
+        MemoryDocumentStore ds = new MemoryDocumentStore();
+        MemoryBlobStore bs = new MemoryBlobStore();
+        MongoMK.Builder builder;
+        
+        builder = new MongoMK.Builder();
+        builder.setDocumentStore(ds).setBlobStore(bs);
+        MongoMK mk1 = builder.setClusterId(1).open();
+        mk1.commit("/", "+\"a\": {}", null, null);
+        mk1.commit("/", "-\"a\"", null, null);
+        
+        builder = new MongoMK.Builder();
+        builder.setDocumentStore(ds).setBlobStore(bs);
+        MongoMK mk2 = builder.setClusterId(2).open();
+        mk2.commit("/", "+\"a\": {}", null, null);
+        mk2.commit("/", "-\"a\"", null, null);
+        
+        mk1.dispose();
+        mk2.dispose();
+    }    
+    
+    @Test
     public void clusterNodeId() {
         MongoMK mk1 = createMK(0);
         MongoMK mk2 = createMK(0);
@@ -132,6 +156,11 @@ public class ClusterTest {
         } catch (MicroKernelException e) {
             // expected
         }
+        // now, after the conflict, both cluster nodes see the node
+        // (before the conflict, this isn't necessarily the case for mk2)
+        String n1 = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 10, null);
+        String n2 = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 10, null);
+        assertEquals(n1, n2);
         
         mk1.dispose();
         mk2.dispose();
@@ -145,13 +174,12 @@ public class ClusterTest {
         String m2h;
         m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
         assertEquals("{\":childNodeCount\":0}", m2h);
+        String oldHead = mk2.getHeadRevision();
         
         mk1.commit("/", "+\"test\":{}", null, null);
         String m1h = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 1, null);
         assertEquals("{\"test\":{},\":childNodeCount\":1}", m1h);
         
-        m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
-        
         // not available yet...
         assertEquals("{\":childNodeCount\":0}", m2h);
         m2h = mk2.getNodes("/test", mk2.getHeadRevision(), 0, 0, 2, null);
@@ -162,7 +190,7 @@ public class ClusterTest {
             if (mk1.getPendingWriteCount() > 0) {
                 continue;
             }
-            if (mk2.isCached("/")) {
+            if (mk2.getHeadRevision().equals(oldHead)) {
                 continue;
             }
             break;

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java Tue May  7 14:39:45 2013
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertEqu
 public class ConcurrentConflictTest extends BaseMongoMKTest {
 
     private static final boolean USE_LOGGER = true;
-    private static final Logger log = LoggerFactory.getLogger(ConcurrentConflictTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentConflictTest.class);
     private static final int NUM_WRITERS = 3;
     private static final int NUM_NODES = 10;
     private static final int NUM_TRANSFERS_PER_THREAD = 10;
@@ -81,7 +81,7 @@ public class ConcurrentConflictTest exte
     }
 
     private void concurrentUpdates(final boolean useBranch) throws Exception {
-        log.info("====== Start test =======");
+        LOG.info("====== Start test =======");
         final AtomicInteger conflicts = new AtomicInteger();
         final List<Exception> exceptions = Collections.synchronizedList(
                 new ArrayList<Exception>());
@@ -144,7 +144,7 @@ public class ConcurrentConflictTest exte
                         long value = (Long) entry.getValue().get("value");
                         jsop.append("^\"/node-").append(entry.getKey());
                         jsop.append("/value\":");
-                        if (value >= 20 && ! withdrawn) {
+                        if (value >= 20 && !withdrawn) {
                             jsop.append(value - 20);
                             withdrawn = true;
                         } else {
@@ -189,9 +189,9 @@ public class ConcurrentConflictTest exte
         }
     }
 
-    private void log(String msg) {
+    void log(String msg) {
         if (USE_LOGGER) {
-            log.info(msg);
+            LOG.info(msg);
         } else {
             synchronized (logBuffer) {
                 logBuffer.append(msg).append("\n");

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RandomizedClusterTest.java Tue May  7 14:39:45 2013
@@ -21,16 +21,17 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
-import org.apache.jackrabbit.mk.core.MicroKernelImpl;
 import org.apache.jackrabbit.mk.json.JsonObject;
 import org.apache.jackrabbit.mk.json.JsopBuilder;
 import org.apache.jackrabbit.mk.json.JsopTokenizer;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.mongodb.DB;
@@ -49,43 +50,48 @@ public class RandomizedClusterTest {
     private MemoryBlobStore bs;
     
     private MongoMK[] mkList = new MongoMK[MK_COUNT];
-    private MicroKernelImpl[] mkListGold = new MicroKernelImpl[MK_COUNT];
     private String[] revList = new String[MK_COUNT];
-    private String[] revListGold = new String[MK_COUNT];
+    @SuppressWarnings({ "unchecked", "cast" })
+    private HashSet<Integer>[] unseenChanges = (HashSet<Integer>[]) new HashSet[MK_COUNT];
+    private HashMap<Integer, List<Op>> changes = new HashMap<Integer, List<Op>>();
 
     private int opId;
     
     private int mkId;
     
     private StringBuilder log;
+    
+    /**
+     * The map of changes. Key: node name; value: the last operation that
+     * changed the node.
+     */
+    private HashMap<String, Integer> nodeChange = new HashMap<String, Integer>();
 
     @Test
-    @Ignore
     public void addRemoveSet() throws Exception {
-        MicroKernelImpl mkG = new MicroKernelImpl();
         for (int i = 0; i < MK_COUNT; i++) {
+            unseenChanges[i] = new HashSet<Integer>();
             mkList[i] = createMK(i);
             revList[i] = mkList[i].getHeadRevision();
-            mkListGold[i] = mkG;
-            revListGold[i] = mkListGold[i].getHeadRevision();
         }
         HashMap<Integer, ClusterRev> revs = 
                 new HashMap<Integer, ClusterRev>();
         
         Random r = new Random(1);
         int operations = 1000, nodeCount = 10;
-        int propertyCount = 5, valueCount = 10;
+        int valueCount = 10;
         int maxBackRev = 20;
         log = new StringBuilder();
         try {
             int maskOk = 0, maskFail = 0;
             int opCount = 6;
+            nodeChange.clear();
             for (int i = 0; i < operations; i++) {
                 opId = i;
                 mkId = r.nextInt(mkList.length);
                 String node = "t" + r.nextInt(nodeCount);
                 String node2 = "t" + r.nextInt(nodeCount);
-                String property = "p" + r.nextInt(propertyCount);
+                String property = "x";
                 String value = "" + r.nextInt(valueCount);
                 String diff;
                 int op = r.nextInt(opCount);
@@ -94,65 +100,131 @@ public class RandomizedClusterTest {
                     // there are enough nodes to operate on
                     op = 0;
                 }
-                boolean result;
+                String result;
+                boolean conflictExpected;
                 switch(op) {
                 case 0:
                     diff = "+ \"" + node + "\": { \"" + property + "\": " + value + "}";
                     log(diff);
-                    result = commit(diff);
+                    if (exists(node)) {
+                        log("already exists");
+                        result = null;
+                    } else {
+                        conflictExpected = isConflict(node);
+                        result = commit(diff, conflictExpected);
+                        if (result != null) {
+                            changes.put(i, Arrays.asList(new Op(mkId, node, value)));
+                            nodeChange.put(node, i);
+                        }
+                    }
                     break;
                 case 1:
                     diff = "- \"" + node + "\"";
                     log(diff);
-                    result = commit(diff);
+                    if (exists(node)) {
+                        conflictExpected = isConflict(node);
+                        result = commit(diff, conflictExpected);
+                        if (result != null) {
+                            changes.put(i, Arrays.asList(new Op(mkId, node, null)));
+                            nodeChange.put(node, i);
+                        }
+                    } else {
+                        log("doesn't exist");
+                        result = null;
+                    }
                     break;
                 case 2:
                     diff = "^ \"" + node + "/" + property + "\": " + value;
                     log(diff);
-                    result = commit(diff);
+                    if (exists(node)) {
+                        conflictExpected = isConflict(node);
+                        result = commit(diff, conflictExpected);
+                        if (result != null) {
+                            changes.put(i, Arrays.asList(new Op(mkId, node, value)));
+                            nodeChange.put(node, i);
+                        }
+                    } else {
+                        log("doesn't exist");
+                        result = null;
+                    }
                     break;
                 case 3:
                     diff = "> \"" + node + "\": \"" + node2 + "\"";
                     log(diff);
-                    result = commit(diff);
+                    if (exists(node) && !exists(node2)) {
+                        conflictExpected = isConflict(node) | isConflict(node2);
+                        result = commit(diff, conflictExpected);
+                        if (result != null) {
+                            value = getValue(mkId, i, node);
+                            changes.put(i, Arrays.asList(
+                                    new Op(mkId, node, null), new Op(mkId, node2, value)));
+                            nodeChange.put(node, i);
+                            nodeChange.put(node2, i);
+                        }
+                    } else {
+                        log("source doesn't exist or target exists");
+                        result = null;
+                    }
                     break;
                 case 4:
-                    diff = "* \"" + node + "\": \"" + node2 + "\"";
-                    log(diff);
-                    result = commit(diff);
+                    if (isConflict(node)) {
+                        // the MicroKernelImpl would report a conflict
+                        result = null;
+                    } else {
+                        diff = "* \"" + node + "\": \"" + node2 + "\"";
+                        log(diff);
+                        if (exists(node) && !exists(node2)) {
+                            conflictExpected = isConflict(node2);
+                            result = commit(diff, conflictExpected);
+                            if (result != null) {
+                                value = getValue(mkId, i, node);
+                                changes.put(i, Arrays.asList(new Op(mkId, node2, value)));
+                                nodeChange.put(node2, i);
+                            }
+                        } else {
+                            log("source doesn't exist or target exists");
+                            result = null;
+                        }
+                    }
                     break;
                 case 5:
-                    revList[mkId] = mkList[mkId].getHeadRevision();
-                    revListGold[mkId] = mkListGold[mkId].getHeadRevision();
+                    log("sync/refresh");
+                    syncAndRefreshAllClusterNodes();
+                    // go to head revision
+                    result = revList[mkId] = mkList[mkId].getHeadRevision();
                     // fake failure
-                    result = i % 2 == 0;
+                    maskFail |= 1 << op;
                     break;
                 default:
                     fail();
-                    result = false;
+                    result = null;
                 }
-                if (result) {
-                    maskOk |= 1 << op;
-                } else {
+                if (result == null) {
                     maskFail |= 1 << op;
+                    log(" -> fail " + Integer.toBinaryString(maskFail));
+                } else {
+                    maskOk |= 1 << op;
+                    log(" -> " + result);
+                    // all other cluster nodes didn't see this particular change yet
+                    for (int j = 0; j < unseenChanges.length; j++) {
+                        if (j != mkId) {
+                            unseenChanges[j].add(i);
+                        }
+                    }
                 }
                 log("get " + node);
-                get(node);
+                boolean x = get(i, node);
+                log("get " + node + " returns " + x);
                 log("get " + node2);
-                get(node2);
+                x = get(i, node2);
+                log("get " + node2 + " returns " + x);
                 MongoMK mk = mkList[mkId];
-                MicroKernelImpl mkGold = mkListGold[mkId];
                 ClusterRev cr = new ClusterRev();
                 cr.mkId = mkId;
                 cr.rev = mk.getHeadRevision();
-                cr.revGold = mkGold.getHeadRevision();
                 revs.put(i, cr);
                 revs.remove(i - maxBackRev);
-                int revId = i - r.nextInt(maxBackRev);
-                cr = revs.get(revId);
-                if (cr != null) {
-                    get(node, cr.revGold, cr.rev);
-                }
+                log.append('\n');
             }
             if (Integer.bitCount(maskOk) != opCount) {
                 fail("Not all operations were at least once successful: " + Integer.toBinaryString(maskOk));
@@ -167,45 +239,91 @@ public class RandomizedClusterTest {
         }
         for (int i = 0; i < MK_COUNT; i++) {
             mkList[i].dispose();
-            mkListGold[i].dispose();
         }
         // System.out.println(log);
         // System.out.println();
     }
     
+    private String getValue(int clusterId, int maxOp, String nodeName) {
+        for (int i = maxOp; i >= 0; i--) {
+            List<Op> ops = changes.get(i);
+            if (ops != null) {
+                for (Op o : ops) {
+                    if (o.clusterId != clusterId && unseenChanges[clusterId].contains(i)) {
+                        continue;
+                    }
+                    if (o.nodeName.equals(nodeName)) {
+                        return o.value;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+    
+    private boolean isConflict(String node) {
+        Integer change = nodeChange.get(node);
+        if (change == null || !unseenChanges[mkId].contains(change)) {
+            return false;
+        }
+        return true;
+    }
+    
     private void log(String msg) {
         msg = opId + ": [" + mkId + "] " + msg + "\n";
         log.append(msg);
     }
     
-    private void get(String node) {
-        String headGold = mkListGold[mkId].getHeadRevision();
+    private void syncClusterNode() {
         for (int i = 0; i < mkList.length; i++) {
             MongoMK mk = mkList[i];
             mk.backgroundWrite();
         }
         MongoMK mk = mkList[mkId];
         mk.backgroundRead();
-        String head = mk.getHeadRevision();
-        get(node, headGold, head);
+    }
+    
+    private void syncAndRefreshAllClusterNodes() {
+        syncClusterNode();
+        for (int i = 0; i < mkList.length; i++) {
+            MongoMK mk = mkList[i];
+            mk.backgroundRead();
+            revList[i] = mk.getHeadRevision();
+            unseenChanges[i].clear();
+        }
+        log("sync");
+    }
+    
+    private boolean get(int maxOp, String node) {
+        String head = revList[mkId];
+        return get(maxOp, node, head);
+    }
+    
+    private boolean exists(String node) {
+        String head = revList[mkId];
+        MongoMK mk = mkList[mkId];
+        return mk.nodeExists("/" + node, head);
     }
         
-    private void get(String node, String headGold, String head) {
+    private boolean get(int maxOp, String node, String head) {
         String p = "/" + node;
-        MicroKernelImpl mkGold = mkListGold[mkId];
         MongoMK mk = mkList[mkId];
-        if (!mkGold.nodeExists(p, headGold)) {
-            assertFalse(mk.nodeExists(p, head));
-            return;
+        String value = getValue(mkId, maxOp, node);
+        if (value == null) {
+            assertFalse("path: " + p + " is supposed to not exist", 
+                    mk.nodeExists(p, head));
+            return false;
         }
         if (!mk.nodeExists(p, head)) {
-            assertTrue("path: " + p, mk.nodeExists(p, head));
+            assertTrue("path: " + p + " is supposed to exist", 
+                    mk.nodeExists(p, head));
         }
-        String resultGold = mkGold.getNodes(p, headGold, 0, 0, Integer.MAX_VALUE, null);
+        String expected = "{\":childNodeCount\":0,\"x\":" + value + "}";
         String result = mk.getNodes(p, head, 0, 0, Integer.MAX_VALUE, null);
-        resultGold = normalize(resultGold);
+        expected = normalize(expected);
         result = normalize(result);
-        assertEquals(resultGold, result);
+        assertEquals(expected, result);
+        return true;
     }
     
     private static String normalize(String json) {
@@ -217,32 +335,45 @@ public class RandomizedClusterTest {
         return w.toString();
     }
 
-    private boolean commit(String diff) {
+    private String commit(String diff, boolean conflictExpected) {
         boolean ok = false;
-        MicroKernelImpl mkGold = mkListGold[mkId];
-        String revGold = revListGold[mkId];
         MongoMK mk = mkList[mkId];
         String rev = revList[mkId];
-        try {
-            mkGold.commit("/", diff, revGold, null);
+        String result = null;
+        String ex = null;
+        if (conflictExpected) {
+            ok = false;
+            ex = "conflict expected";
+            // afterwards, this cluster node should synchronize
+            unseenChanges[mkId].clear();
+        } else {
             ok = true;
-        } catch (MicroKernelException e) {
+        }
+        if (ok) {
+            result = mk.commit("/", diff, rev, null);
+            revList[mkId] = result;
+        } else {
             // System.out.println("--> fail " + e.toString());            
             try {
                 mk.commit("/", diff, rev, null);
-                fail("Should fail: " + diff + " with exception " + e);
+                fail("Should fail: " + diff + " with " + ex);
             } catch (MicroKernelException e2) {
                 // expected
+                revList[mkId] = mk.getHeadRevision();
+                // it might have been not a conflict with another cluster node
+                // TODO test two cases: conflict with other cluster node
+                // (this should auto-synchronize until the given conflict)
+                // and conflict with a previous change that was already seen,
+                // which shouldn't synchronize
+                syncAndRefreshAllClusterNodes();
             }
         }
-        if (ok) {
-            mk.commit("/", diff, rev, null);
-        }
-        return ok;
+        return result;
     }
     
     private MongoMK createMK(int clusterId) {
         MongoMK.Builder builder = new MongoMK.Builder();
+        builder.setAsyncDelay(0);
         if (MONGO_DB) {
             DB db = MongoUtils.getConnection().getDB();
             MongoUtils.dropCollections(db);
@@ -264,7 +395,21 @@ public class RandomizedClusterTest {
      */
     static class ClusterRev {
         int mkId;
-        String rev, revGold;
+        String rev;
+    }
+    
+    /**
+     * An operation.
+     */
+    static class Op {
+        final int clusterId;
+        final String nodeName;
+        final String value;
+        public Op(int clusterId, String nodeName, String value) {
+            this.clusterId = clusterId;
+            this.nodeName = nodeName;
+            this.value = value;
+        }
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/RevisionTest.java Tue May  7 14:39:45 2013
@@ -108,7 +108,7 @@ public class RevisionTest {
     
     @Test
     public void revisionComparatorSimple() {
-        RevisionComparator comp = new RevisionComparator();
+        RevisionComparator comp = new RevisionComparator(0);
         Revision r1 = Revision.newRevision(0);
         Revision r2 = Revision.newRevision(0);
         assertEquals(r1.compareRevisionTime(r2), comp.compare(r1, r2));
@@ -119,7 +119,7 @@ public class RevisionTest {
     @Test
     public void revisionComparatorCluster() {
         
-        RevisionComparator comp = new RevisionComparator();
+        RevisionComparator comp = new RevisionComparator(0);
         
         Revision r1c1 = new Revision(0x110, 0, 1);
         Revision r2c1 = new Revision(0x120, 0, 1);
@@ -138,12 +138,13 @@ public class RevisionTest {
         comp.add(r2c2, 10);
 
         assertEquals(
-                "1: r120-0-1:20; " + 
-                "2: r200-0-2:10; ", comp.toString());
+                "1:\n r120-0-1:20\n" + 
+                "2:\n r200-0-2:10\n", comp.toString());
 
         assertEquals(1, comp.compare(r1c1, r1c2));
         assertEquals(1, comp.compare(r2c1, r2c2));
-        assertEquals(1, comp.compare(r3c1, r3c2));
+        // r3c2 is still "in the future"
+        assertEquals(-1, comp.compare(r3c1, r3c2));
         
         // now we declare r3 of c1 to be before r3 of c2
         // (with the same range timestamp, 
@@ -152,8 +153,8 @@ public class RevisionTest {
         comp.add(r3c2, 30);
 
         assertEquals(
-                "1: r120-0-1:20 r130-0-1:30; " + 
-                "2: r200-0-2:10 r300-0-2:30; ", comp.toString());
+                "1:\n r120-0-1:20 r130-0-1:30\n" + 
+                "2:\n r200-0-2:10 r300-0-2:30\n", comp.toString());
 
         assertEquals(1, comp.compare(r1c1, r1c2));
         assertEquals(1, comp.compare(r2c1, r2c2));
@@ -166,18 +167,18 @@ public class RevisionTest {
         // get rid of old timestamps
         comp.purge(10);
         assertEquals(
-                "1: r120-0-1:20 r130-0-1:30; " + 
-                "2: r300-0-2:30; ", comp.toString());
+                "1:\n r120-0-1:20 r130-0-1:30\n" + 
+                "2:\n r300-0-2:30\n", comp.toString());
         comp.purge(20);
         assertEquals(
-                "1: r130-0-1:30; " + 
-                "2: r300-0-2:30; ", comp.toString());
+                "1:\n r130-0-1:30\n" + 
+                "2:\n r300-0-2:30\n", comp.toString());
         
         // update an entry
         comp.add(new Revision(0x301, 1, 2), 30);
         assertEquals(
-                "1: r130-0-1:30; " + 
-                "2: r301-1-2:30; ", comp.toString());
+                "1:\n r130-0-1:30\n" + 
+                "2:\n r301-1-2:30\n", comp.toString());
         
         comp.purge(30);
         assertEquals("", comp.toString());

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java?rev=1479926&r1=1479925&r2=1479926&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java Tue May  7 14:39:45 2013
@@ -72,7 +72,7 @@ public class SimpleTest {
     @Test
     public void addNodeGetNode() {
         MongoMK mk = new MongoMK.Builder().open();
-        Revision rev = mk.newRevision();
+        Revision rev = Revision.fromString(mk.getHeadRevision());
         Node n = new Node("/test", rev);
         n.setProperty("name", "Hello");
         UpdateOp op = n.asOperation(true);