You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/09/08 10:03:25 UTC

svn commit: r1701741 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/

Author: mreutegg
Date: Tue Sep  8 08:03:25 2015
New Revision: 1701741

URL: http://svn.apache.org/r1701741
Log:
OAK-3042: Suspend commit on conflict

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java Tue Sep  8 08:03:25 2015
@@ -72,14 +72,14 @@ class Collision {
     @Nonnull
     Revision mark(DocumentStore store) throws DocumentStoreException {
         // first try to mark their revision
-        if (markCommitRoot(document, theirRev, store)) {
+        if (markCommitRoot(document, theirRev, ourRev, store)) {
             return theirRev;
         }
         // their commit wins, we have to mark ourRev
         NodeDocument newDoc = Collection.NODES.newDocument(store);
         document.deepCopy(newDoc);
         UpdateUtils.applyChanges(newDoc, ourOp, context.getRevisionComparator());
-        if (!markCommitRoot(newDoc, ourRev, store)) {
+        if (!markCommitRoot(newDoc, ourRev, theirRev, store)) {
             throw new IllegalStateException("Unable to annotate our revision "
                     + "with collision marker. Our revision: " + ourRev
                     + ", document:\n" + newDoc.format());
@@ -94,12 +94,14 @@ class Collision {
      * @param document the document.
      * @param revision the revision of the commit to annotated with a collision
      *            marker.
+     * @param other the revision which detected the collision.
      * @param store the document store.
      * @return <code>true</code> if the commit for the given revision was marked
      *         successfully; <code>false</code> otherwise.
      */
     private static boolean markCommitRoot(@Nonnull NodeDocument document,
                                           @Nonnull Revision revision,
+                                          @Nonnull Revision other,
                                           @Nonnull DocumentStore store) {
         String p = document.getPath();
         String commitRootPath;
@@ -131,7 +133,7 @@ class Collision {
             // already marked
             return true;
         }
-        NodeDocument.addCollision(op, revision);
+        NodeDocument.addCollision(op, revision, other);
         String commitValue = commitRoot.getLocalRevisions().get(revision);
         if (commitValue == null) {
             // no revision entry yet

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java Tue Sep  8 08:03:25 2015
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -393,7 +394,16 @@ public class Commit {
                     if (before == null) {
                         String msg = "Conflicting concurrent change. " +
                                 "Update operation failed: " + commitRoot;
-                        throw new DocumentStoreException(msg);
+                        NodeDocument commitRootDoc = store.find(NODES, commitRoot.getId());
+                        DocumentStoreException dse;
+                        if (commitRootDoc == null) {
+                            dse = new DocumentStoreException(msg);
+                        } else {
+                            dse = new ConflictException(msg,
+                                    commitRootDoc.getMostRecentConflictFor(
+                                        Collections.singleton(revision), nodeStore));
+                        }
+                        throw dse;
                     } else {
                         success = true;
                         // if we get here the commit was successful and
@@ -490,9 +500,12 @@ public class Commit {
      * @param op the update operation.
      * @param before how the document looked before the update was applied or
      *               {@code null} if it didn't exist before.
+     * @throws ConflictException if there was a conflict introduced by the
+     *          given update operation.
      */
     private void checkConflicts(@Nonnull UpdateOp op,
-                                @Nullable NodeDocument before) {
+                                @Nullable NodeDocument before)
+            throws ConflictException {
         DocumentStore store = nodeStore.getDocumentStore();
         collisions.clear();
         if (baseRevision != null) {
@@ -507,10 +520,14 @@ public class Commit {
                         });
             }
             String conflictMessage = null;
+            Revision conflictRevision = newestRev;
             if (newestRev == null) {
                 if ((op.isDelete() || !op.isNew()) && isConflicting(before, op)) {
                     conflictMessage = "The node " +
                             op.getId() + " does not exist or is already deleted";
+                    if (before != null && !before.getLocalDeleted().isEmpty()) {
+                        conflictRevision = before.getLocalDeleted().firstKey();
+                    }
                 }
             } else {
                 if (op.isNew() && isConflicting(before, op)) {
@@ -545,6 +562,7 @@ public class Commit {
                                         op.getId() + " was changed in revision\n" + r +
                                         ", which was applied after the base revision\n" +
                                         baseRevision;
+                                conflictRevision = r;
                             }
                         }
                     }
@@ -558,7 +576,7 @@ public class Commit {
                             ",\nrevision order:\n" +
                             nodeStore.getRevisionComparator());
                 }
-                throw new DocumentStoreException(conflictMessage);
+                throw new ConflictException(conflictMessage, conflictRevision);
             }
         }
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java Tue Sep  8 08:03:25 2015
@@ -19,14 +19,21 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 
+import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,13 +41,22 @@ import org.slf4j.LoggerFactory;
  * <code>CommitQueue</code> ensures a sequence of commits consistent with the
  * commit revision even if commits did not complete in this sequence.
  */
-abstract class CommitQueue {
+final class CommitQueue {
 
     static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class);
 
     private final SortedMap<Revision, Entry> commits = new TreeMap<Revision, Entry>(StableRevisionComparator.INSTANCE);
 
-    protected abstract Revision newRevision();
+    /**
+     * Map of currently suspended commits until a given Revision is visible.
+     */
+    private final Map<Semaphore, Revision> suspendedCommits = Maps.newIdentityHashMap();
+
+    private final RevisionContext context;
+
+    CommitQueue(@Nonnull RevisionContext context) {
+        this.context = checkNotNull(context);
+    }
 
     @Nonnull
     Revision createRevision() {
@@ -54,7 +70,7 @@ abstract class CommitQueue {
         Revision rev = null;
         synchronized (this) {
             for (int i = 0; i < num; i++) {
-                rev = newRevision();
+                rev = context.newRevision();
                 revs.add(rev);
             }
             commits.put(rev, new Entry(rev));
@@ -70,6 +86,7 @@ abstract class CommitQueue {
 
     void canceled(@Nonnull Revision rev) {
         removeCommit(rev);
+        notifySuspendedCommits(rev);
     }
 
     boolean contains(@Nonnull Revision revision) {
@@ -78,6 +95,44 @@ abstract class CommitQueue {
         }
     }
 
+    /**
+     * Suspends until the given revision is visible from the current
+     * headRevision or the given revision is canceled from the commit queue.
+     *
+     * @param r the revision to become visible.
+     */
+    void suspendUntil(@Nonnull Revision r) {
+        Comparator<Revision> comparator = context.getRevisionComparator();
+        Semaphore s = null;
+        synchronized (suspendedCommits) {
+            Revision headRevision = context.getHeadRevision();
+            if (comparator.compare(r, headRevision) > 0) {
+                s = new Semaphore(0);
+                suspendedCommits.put(s, r);
+            }
+        }
+        if (s != null) {
+            s.acquireUninterruptibly();
+        }
+    }
+
+    /**
+     * Called when the head revision accessible via the {@link RevisionContext}
+     * passed to constructor changed.
+     */
+    void headRevisionChanged() {
+        notifySuspendedCommits();
+    }
+
+    /**
+     * @return the number of suspended threads on this commit queue.
+     */
+    int numSuspendedThreads() {
+        synchronized (suspendedCommits) {
+            return suspendedCommits.size();
+        }
+    }
+
     interface Callback {
 
         void headOfQueue(@Nonnull Revision revision);
@@ -85,6 +140,43 @@ abstract class CommitQueue {
 
     //------------------------< internal >--------------------------------------
 
+    private void notifySuspendedCommits() {
+        synchronized (suspendedCommits) {
+            if (suspendedCommits.isEmpty()) {
+                return;
+            }
+            Comparator<Revision> comparator = context.getRevisionComparator();
+            Revision headRevision = context.getHeadRevision();
+            Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<Semaphore, Revision> entry = it.next();
+                if (comparator.compare(entry.getValue(), headRevision) <= 0) {
+                    Semaphore s = entry.getKey();
+                    it.remove();
+                    s.release();
+                }
+            }
+        }
+    }
+
+    private void notifySuspendedCommits(@Nonnull Revision revision) {
+        checkNotNull(revision);
+        synchronized (suspendedCommits) {
+            if (suspendedCommits.isEmpty()) {
+                return;
+            }
+            Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<Semaphore, Revision> entry = it.next();
+                if (revision.equals(entry.getValue())) {
+                    Semaphore s = entry.getKey();
+                    it.remove();
+                    s.release();
+                }
+            }
+        }
+    }
+
     private void removeCommit(@Nonnull Revision rev) {
         // simply remove and notify next head if any
         synchronized (this) {

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java Tue Sep  8 08:03:25 2015
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
+
+/**
+ * A document store exception with an optional conflict revision. The
+ * DocumentNodeStore implementation will throw this exception when a commit
+ * or merge fails with a conflict.
+ */
+class ConflictException extends DocumentStoreException {
+
+    private static final long serialVersionUID = 1418838561903727231L;
+
+    /**
+     * Optional conflict revision.
+     */
+    private final Revision conflictRevision;
+
+    /**
+     * @param message the exception / conflict message.
+     * @param conflictRevision the conflict revision or {@code null} if unknown.
+     */
+    ConflictException(@Nonnull String message,
+                      @Nullable Revision conflictRevision) {
+        super(checkNotNull(message));
+        this.conflictRevision = conflictRevision;
+    }
+
+    /**
+     * Convert this exception into a {@link CommitFailedException}. This
+     * exception will be set as the cause of the returned exception.
+     *
+     * @return a {@link CommitFailedException}.
+     */
+    CommitFailedException asCommitFailedException() {
+        if (conflictRevision != null) {
+            return new FailedWithConflictException(conflictRevision, getMessage(), this);
+        } else {
+            return new CommitFailedException(MERGE, 1,
+                    "Failed to merge changes to the underlying store", this);
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Sep  8 08:03:25 2015
@@ -22,7 +22,6 @@ import static com.google.common.collect.
 import static com.google.common.collect.Iterables.toArray;
 import static com.google.common.collect.Iterables.transform;
 import static java.util.Collections.singletonList;
-import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
 import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
@@ -510,12 +509,7 @@ public final class DocumentNodeStore
         getRevisionComparator().add(headRevision, Revision.newRevision(0));
 
         dispatcher = new ChangeDispatcher(getRoot());
-        commitQueue = new CommitQueue() {
-            @Override
-            protected Revision newRevision() {
-                return DocumentNodeStore.this.newRevision();
-            }
-        };
+        commitQueue = new CommitQueue(this);
         String threadNamePostfix = "(" + clusterId + ")";
         batchCommitQueue = new BatchCommitQueue(store, revisionComparator);
         backgroundReadThread = new Thread(
@@ -617,19 +611,6 @@ public final class DocumentNodeStore
     }
 
     /**
-     * Create a new revision.
-     *
-     * @return the revision
-     */
-    @Nonnull
-    Revision newRevision() {
-        if (simpleRevisionCounter != null) {
-            return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
-        }
-        return Revision.newRevision(clusterId);
-    }
-
-    /**
      * Creates a new commit. The caller must acknowledge the commit either with
      * {@link #done(Commit, boolean, CommitInfo)} or {@link #canceled(Commit)},
      * depending on the result of the commit.
@@ -700,6 +681,7 @@ public final class DocumentNodeStore
                         changes.modified(c.getModifiedPaths());
                         // update head revision
                         setHeadRevision(c.getRevision());
+                        commitQueue.headRevisionChanged();
                         dispatcher.contentChanged(getRoot(), info);
                     }
                 });
@@ -1382,8 +1364,10 @@ public final class DocumentNodeStore
                     b.applyTo(getPendingModifications(), commit.getRevision());
                     getBranches().remove(b);
                 } else {
-                    throw new CommitFailedException(MERGE, 2,
-                            "Conflicting concurrent change. Update operation failed: " + op);
+                    NodeDocument root = Utils.getRootDocument(store);
+                    Revision conflictRev = root.getMostRecentConflictFor(b.getCommits(), this);
+                    String msg = "Conflicting concurrent change. Update operation failed: " + op;
+                    throw new ConflictException(msg, conflictRev).asCommitFailedException();
                 }
             } else {
                 // no commits in this branch -> do nothing
@@ -1503,6 +1487,24 @@ public final class DocumentNodeStore
         }
     }
 
+    /**
+     * Suspends until the given revision is visible from the current
+     * headRevision or the given revision is canceled from the commit queue.
+     *
+     * The thread will *not* be suspended if the given revision is from a
+     * foreign cluster node and async delay is set to zero.
+     *
+     * @param r the revision to become visible.
+     */
+    void suspendUntil(@Nonnull Revision r) {
+        // do not suspend if revision is from another cluster node
+        // and background read is disabled
+        if (r.getClusterId() != getClusterId() && getAsyncDelay() == 0) {
+            return;
+        }
+        commitQueue.suspendUntil(r);
+    }
+
     //------------------------< Observable >------------------------------------
 
     @Override
@@ -1647,6 +1649,14 @@ public final class DocumentNodeStore
         return headRevision;
     }
 
+    @Nonnull
+    public Revision newRevision() {
+        if (simpleRevisionCounter != null) {
+            return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
+        }
+        return Revision.newRevision(clusterId);
+    }
+
     //----------------------< background operations >---------------------------
 
     /** Used for testing only */
@@ -1904,6 +1914,7 @@ public final class DocumentNodeStore
                     // the new head revision is after other revisions
                     setHeadRevision(newRevision());
                     if (dispatchChange) {
+                        commitQueue.headRevisionChanged();
                         time = clock.getTime();
                         if (externalSort != null) {
                             // then there were external changes and reading them
@@ -1911,7 +1922,7 @@ public final class DocumentNodeStore
                             try {
                                 JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
                             } catch (Exception e1) {
-                                LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+                                LOG.error("backgroundRead: Exception while processing external changes from journal: {}", e1, e1);
                             }
                         }
                         stats.populateDiffCache = clock.getTime() - time;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java Tue Sep  8 08:03:25 2015
@@ -113,6 +113,12 @@ class DocumentNodeStoreBranch implements
             throws CommitFailedException {
         try {
             return merge0(hook, info, false);
+        } catch (FailedWithConflictException e) {
+            // suspend until conflicting revision is visible
+            LOG.debug("Suspending until {} is visible. Current head {}.",
+                    e.getConflictRevision(), store.getHeadRevision());
+            store.suspendUntil(e.getConflictRevision());
+            LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
         } catch (CommitFailedException e) {
             if (!e.isOfType(MERGE)) {
                 throw e;
@@ -166,6 +172,9 @@ class DocumentNodeStoreBranch implements
             try {
                 return branchState.merge(checkNotNull(hook),
                         checkNotNull(info), exclusive);
+            } catch (FailedWithConflictException e) {
+                // let caller decide what to do with conflicting revision
+                throw e;
             } catch (CommitFailedException e) {
                 LOG.trace("Merge Error", e);
                 ex = e;
@@ -481,6 +490,8 @@ class DocumentNodeStoreBranch implements
                     NodeState newHead = DocumentNodeStoreBranch.this.persist(toCommit, base, info);
                     branchState = new Merged(base);
                     return newHead;
+                } catch (ConflictException e) {
+                    throw e.asCommitFailedException();
                 } catch(DocumentStoreException e) {
                     throw new CommitFailedException(MERGE, 1,
                             "Failed to merge changes to the underlying store", e);

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java Tue Sep  8 08:03:25 2015
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link CommitFailedException} with a conflict revision.
+ */
+class FailedWithConflictException extends CommitFailedException {
+
+    private static final long serialVersionUID = 2716279884065949789L;
+
+    private final Revision conflictRevision;
+
+    FailedWithConflictException(@Nonnull Revision conflictRevision,
+                                @Nonnull String message,
+                                @Nonnull Throwable cause) {
+        super(OAK, MERGE, 4, checkNotNull(message), checkNotNull(cause));
+        this.conflictRevision = checkNotNull(conflictRevision);
+    }
+
+    /**
+     * @return the revision of another commit which caused a conflict.
+     */
+    @Nonnull
+    Revision getConflictRevision() {
+        return conflictRevision;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue Sep  8 08:03:25 2015
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -654,6 +655,45 @@ public final class NodeDocument extends
     }
 
     /**
+     * Returns the most recent conflict on the given {@code branchCommits} if
+     * there are any. The returned revision is the commit, which created the
+     * collision marker for one of the {@code branchCommits}.
+     *
+     * @param branchCommits the branch commits to check.
+     * @param context a revision context.
+     * @return the conflict revision or {@code null} if there aren't any or
+     *          the collision marker does not have a revision value.
+     */
+    @CheckForNull
+    Revision getMostRecentConflictFor(@Nonnull Iterable<Revision> branchCommits,
+                                      @Nonnull RevisionContext context) {
+        checkNotNull(branchCommits);
+        checkNotNull(context);
+
+        Comparator<Revision> comparator = context.getRevisionComparator();
+        Revision conflict = null;
+
+        Map<Revision, String> collisions = getLocalMap(COLLISIONS);
+        for (Revision r : branchCommits) {
+            String value = collisions.get(r.asTrunkRevision());
+            if (value == null) {
+                continue;
+            }
+            Revision c;
+            try {
+                c = Revision.fromString(value);
+            } catch (IllegalArgumentException e) {
+                // backward compatibility: collision marker with value 'true'
+                continue;
+            }
+            if (conflict == null || comparator.compare(conflict, c) < 0) {
+                conflict = c;
+            }
+        }
+        return conflict;
+    }
+
+    /**
      * Returns the commit root path for the given <code>revision</code> or
      * <code>null</code> if this document does not have a commit root entry for
      * the given <code>revision</code>.
@@ -1480,10 +1520,18 @@ public final class NodeDocument extends
         checkNotNull(op).removeMapEntry(REVISIONS, checkNotNull(revision));
     }
 
+    /**
+     * Add a collision marker for the given {@code revision}.
+     *
+     * @param op the update operation.
+     * @param revision the commit for which a collision was detected.
+     * @param other the revision for the commit, which detected the collision.
+     */
     public static void addCollision(@Nonnull UpdateOp op,
-                                    @Nonnull Revision revision) {
+                                    @Nonnull Revision revision,
+                                    @Nonnull Revision other) {
         checkNotNull(op).setMapEntry(COLLISIONS, checkNotNull(revision),
-                String.valueOf(true));
+                other.toString());
     }
 
     public static void removeCollision(@Nonnull UpdateOp op,

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java Tue Sep  8 08:03:25 2015
@@ -51,4 +51,10 @@ public interface RevisionContext {
      */
     @Nonnull
     Revision getHeadRevision();
+
+    /**
+     * @return a new revision for the local document node store instance.
+     */
+    @Nonnull
+    Revision newRevision();
 }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java Tue Sep  8 08:03:25 2015
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.List;
+
+import javax.annotation.CheckForNull;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.spi.commit.CommitInfo.EMPTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class ClusterConflictTest {
+
+    @Rule
+    public final DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterConflictTest.class);
+
+    private DocumentNodeStore ns1;
+    private DocumentNodeStore ns2;
+
+    @Before
+    public void setUp() {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        ns1 = newDocumentNodeStore(store);
+        ns2 = newDocumentNodeStore(store);
+    }
+
+    private DocumentNodeStore newDocumentNodeStore(DocumentStore store) {
+        // use high async delay and run background ops manually
+        // asyncDelay set to zero prevents commits from suspending
+        return builderProvider.newBuilder()
+                .setAsyncDelay(60000)
+                .setDocumentStore(store)
+                .setLeaseCheck(false) // disabled for debugging purposes
+                .getNodeStore();
+    }
+
+    @Test
+    public void suspendUntilVisible() throws Exception {
+        suspendUntilVisible(false);
+    }
+
+    @Test
+    public void suspendUntilVisibleWithBranch() throws Exception {
+        suspendUntilVisible(true);
+    }
+
+    private void suspendUntilVisible(boolean withBranch) throws Exception {
+        NodeBuilder b1 = ns1.getRoot().builder();
+        b1.child("counter").setProperty("value", 0);
+        merge(ns1, b1);
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+
+        b1 = ns1.getRoot().builder();
+        b1.child("foo");
+        ns1.merge(b1, new TestHook(), EMPTY);
+
+        final List<Exception> exceptions = Lists.newArrayList();
+        final NodeBuilder b2 = ns2.getRoot().builder();
+        b2.child("bar");
+        if (withBranch) {
+            purge(b2);
+        }
+        b2.child("baz");
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("initiating merge");
+                    ns2.merge(b2, new TestHook(), EMPTY);
+                    LOG.info("merge succeeded");
+                } catch (CommitFailedException e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+        t.start();
+
+        // wait until t is suspended
+        for (int i = 0; i < 100; i++) {
+            if (ns2.commitQueue.numSuspendedThreads() > 0) {
+                break;
+            }
+            Thread.sleep(10);
+        }
+        assertEquals(1, ns2.commitQueue.numSuspendedThreads());
+        LOG.info("commit suspended");
+
+        ns1.runBackgroundOperations();
+        LOG.info("ran background ops on ns1");
+        ns2.runBackgroundOperations();
+        LOG.info("ran background ops on ns2");
+        assertEquals(0, ns2.commitQueue.numSuspendedThreads());
+
+        t.join(3000);
+        assertFalse("Commit did not succeed within 3 seconds", t.isAlive());
+
+        for (Exception e : exceptions) {
+            throw e;
+        }
+    }
+
+    private static class TestHook extends EditorHook {
+
+        TestHook() {
+            super(new EditorProvider() {
+                @CheckForNull
+                @Override
+                public Editor getRootEditor(NodeState before,
+                                            NodeState after,
+                                            NodeBuilder builder,
+                                            CommitInfo info)
+                        throws CommitFailedException {
+                    return new TestEditor(builder.child("counter"));
+                }
+            });
+        }
+    }
+
+    private static class TestEditor extends DefaultEditor {
+
+        private NodeBuilder counter;
+
+        TestEditor(NodeBuilder counter) {
+            this.counter = counter;
+        }
+
+        @Override
+        public Editor childNodeAdded(String name, NodeState after)
+                throws CommitFailedException {
+            counter.setProperty("value", counter.getProperty("value").getValue(Type.LONG) + 1);
+            return super.childNodeAdded(name, after);
+        }
+    }
+
+    private static NodeState merge(NodeStore store, NodeBuilder root)
+            throws CommitFailedException {
+        return store.merge(root, EmptyHook.INSTANCE, EMPTY);
+    }
+
+    private static void purge(NodeBuilder builder) {
+        ((DocumentRootBuilder) builder).purge();
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java Tue Sep  8 08:03:25 2015
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -37,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.util.Collections.synchronizedList;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 /**
@@ -120,12 +122,7 @@ public class CommitQueueTest {
 
     @Test
     public void concurrentCommits2() throws Exception {
-        final CommitQueue queue = new CommitQueue() {
-            @Override
-            protected Revision newRevision() {
-                return Revision.newRevision(1);
-            }
-        };
+        final CommitQueue queue = new CommitQueue(DummyRevisionContext.INSTANCE);
 
         final CommitQueue.Callback c = new CommitQueue.Callback() {
             private Revision before = Revision.newRevision(1);
@@ -208,6 +205,47 @@ public class CommitQueueTest {
         assertNoExceptions();
     }
 
+    @Test
+    public void suspendUntil() throws Exception {
+        final AtomicReference<Revision> headRevision = new AtomicReference<Revision>();
+        RevisionContext context = new DummyRevisionContext() {
+            @Nonnull
+            @Override
+            public Revision getHeadRevision() {
+                return headRevision.get();
+            }
+        };
+        headRevision.set(context.newRevision());
+        final CommitQueue queue = new CommitQueue(context);
+
+        final Revision r = context.newRevision();
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                queue.suspendUntil(r);
+            }
+        });
+        t.start();
+
+        // wait until t is suspended
+        for (int i = 0; i < 100; i++) {
+            if (queue.numSuspendedThreads() > 0) {
+                break;
+            }
+            Thread.sleep(10);
+        }
+        assertEquals(1, queue.numSuspendedThreads());
+
+        queue.headRevisionChanged();
+        // must still be suspended
+        assertEquals(1, queue.numSuspendedThreads());
+
+        headRevision.set(r);
+        queue.headRevisionChanged();
+        // must not be suspended anymore
+        assertEquals(0, queue.numSuspendedThreads());
+    }
+
     private void assertNoExceptions() throws Exception {
         if (!exceptions.isEmpty()) {
             throw exceptions.get(0);

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java Tue Sep  8 08:03:25 2015
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class ConflictExceptionTest {
+
+    @Test
+    public void type() {
+        ConflictException e = new ConflictException("conflict", null);
+        CommitFailedException cfe = e.asCommitFailedException();
+        assertEquals(CommitFailedException.MERGE, cfe.getType());
+    }
+
+    @Test
+    public void cause() {
+        ConflictException e = new ConflictException("conflict", null);
+        CommitFailedException cfe = e.asCommitFailedException();
+        assertSame(e, cfe.getCause());
+    }
+
+    @Test
+    public void asCommitFailedException() {
+        Revision r = Revision.newRevision(1);
+        ConflictException e = new ConflictException("conflict", r);
+        CommitFailedException cfe = e.asCommitFailedException();
+        assertTrue(cfe instanceof FailedWithConflictException);
+        FailedWithConflictException fwce = (FailedWithConflictException) cfe;
+        assertEquals(CommitFailedException.MERGE, fwce.getType());
+        assertEquals(r, fwce.getConflictRevision());
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java Tue Sep  8 08:03:25 2015
@@ -886,6 +886,12 @@ public class DocumentSplitTest extends B
             }
             return rc.getHeadRevision();
         }
+
+        @Nonnull
+        @Override
+        public Revision newRevision() {
+            return rc.newRevision();
+        }
     }
 
     private static NodeState merge(NodeStore store, NodeBuilder root)

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java Tue Sep  8 08:03:25 2015
@@ -55,4 +55,10 @@ public class DummyRevisionContext implem
     public Revision getHeadRevision() {
         return Revision.newRevision(1);
     }
+
+    @Nonnull
+    @Override
+    public Revision newRevision() {
+        return Revision.newRevision(1);
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java Tue Sep  8 08:03:25 2015
@@ -39,11 +39,13 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.junit.Test;
 
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.revisionAreAmbiguous;
 import static org.apache.jackrabbit.oak.plugins.document.Revision.RevisionComparator;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -61,7 +63,7 @@ public class NodeDocumentTest {
         for (int i = 0; i < NodeDocument.NUM_REVS_THRESHOLD + 1; i++) {
             Revision r = Revision.newRevision(1);
             NodeDocument.setRevision(op, r, "c");
-            NodeDocument.addCollision(op, r);
+            NodeDocument.addCollision(op, r, Revision.newRevision(1));
         }
         UpdateUtils.applyChanges(doc, op, StableRevisionComparator.INSTANCE);
         Revision head = DummyRevisionContext.INSTANCE.getHeadRevision();
@@ -106,6 +108,53 @@ public class NodeDocumentTest {
     }
 
     @Test
+    public void getMostRecentConflictFor() {
+        RevisionContext context = DummyRevisionContext.INSTANCE;
+        MemoryDocumentStore docStore = new MemoryDocumentStore();
+        String id = Utils.getPathFromId("/");
+        NodeDocument doc = new NodeDocument(docStore);
+        doc.put(Document.ID, id);
+
+        Iterable<Revision> branchCommits = Collections.emptyList();
+        Revision conflict = doc.getMostRecentConflictFor(branchCommits, context);
+        assertNull(conflict);
+
+        // add some collisions
+        UpdateOp op = new UpdateOp(id, false);
+        Revision r0 = Revision.newRevision(1);
+        Revision r1 = Revision.newRevision(1);
+        Revision c1 = Revision.newRevision(1);
+        Revision r2 = Revision.newRevision(1);
+        Revision c2 = Revision.newRevision(1);
+        // backward compatibility test
+        op.setMapEntry(COLLISIONS, r0, String.valueOf(true));
+        // regular collision entries
+        NodeDocument.addCollision(op, r1, c1);
+        NodeDocument.addCollision(op, r2, c2);
+        UpdateUtils.applyChanges(doc, op, StableRevisionComparator.INSTANCE);
+
+        branchCommits = Collections.singleton(r0);
+        conflict = doc.getMostRecentConflictFor(branchCommits, context);
+        assertNull(conflict);
+
+        branchCommits = Collections.singleton(r1.asBranchRevision());
+        conflict = doc.getMostRecentConflictFor(branchCommits, context);
+        assertEquals(c1, conflict);
+
+        branchCommits = Collections.singleton(r2.asBranchRevision());
+        conflict = doc.getMostRecentConflictFor(branchCommits, context);
+        assertEquals(c2, conflict);
+
+        branchCommits = Lists.newArrayList(r1.asBranchRevision(), r2.asBranchRevision());
+        conflict = doc.getMostRecentConflictFor(branchCommits, context);
+        assertEquals(c2, conflict);
+
+        branchCommits = Lists.newArrayList(r2.asBranchRevision(), r1.asBranchRevision());
+        conflict = doc.getMostRecentConflictFor(branchCommits, context);
+        assertEquals(c2, conflict);
+    }
+
+    @Test
     public void getAllChanges() throws Exception {
         final int NUM_CHANGES = 200;
         DocumentNodeStore ns = createTestStore(NUM_CHANGES);

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java Tue Sep  8 08:03:25 2015
@@ -26,10 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -62,7 +59,6 @@ public class ConcurrentAddNodesClusterIT
     private static final int LOOP_COUNT = 10;
     private static final int WORKER_COUNT = 20;
     private static final String PROP_NAME = "testcount";
-    private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
 
     private List<Repository> repos = new ArrayList<Repository>();
     private List<DocumentMK> mks = new ArrayList<DocumentMK>();
@@ -81,12 +77,15 @@ public class ConcurrentAddNodesClusterIT
 
     @After
     public void after() throws Exception {
+        workers.clear();
         for (Repository repo : repos) {
             dispose(repo);
         }
+        repos.clear();
         for (DocumentMK mk : mks) {
             mk.dispose();
         }
+        mks.clear();
         dropDB();
     }
 
@@ -211,7 +210,6 @@ public class ConcurrentAddNodesClusterIT
         for (int i = 0; i < 2; i++) {
             DocumentMK mk = new DocumentMK.Builder()
                     .setMongoDB(createConnection().getDB())
-                    .setAsyncDelay(0)
                     .setClusterId(i + 1).open();
             mks.add(mk);
         }
@@ -226,13 +224,11 @@ public class ConcurrentAddNodesClusterIT
         Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray()));
 
         ensureIndex(s1.getRootNode(), PROP_NAME);
-        syncMKs(1);
         ensureIndex(s2.getRootNode(), PROP_NAME);
 
         Map<String, Exception> exceptions = Collections.synchronizedMap(
                 new HashMap<String, Exception>());
         createNodes(s1, "testroot-1", 1, 1, exceptions);
-        syncMKs(1);
         createNodes(s2, "testroot-2", 1, 1, exceptions);
 
         for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
@@ -267,9 +263,9 @@ public class ConcurrentAddNodesClusterIT
         Session s3 = r3.login(new SimpleCredentials("admin", "admin".toCharArray()));
 
         ensureIndex(s1.getRootNode(), PROP_NAME);
-        syncMKs(1);
-        ensureIndex(s2.getRootNode(), PROP_NAME);
-        ensureIndex(s3.getRootNode(), PROP_NAME);
+        runBackgroundOps(mk1);
+        runBackgroundOps(mk2);
+        runBackgroundOps(mk3);
 
         // begin test
 
@@ -367,18 +363,6 @@ public class ConcurrentAddNodesClusterIT
         s2.logout();
     }
 
-    private void syncMKs(int delay) {
-        EXECUTOR.schedule(new Callable<Object>() {
-            @Override
-            public Object call() throws Exception {
-                for (DocumentMK mk : mks) {
-                    runBackgroundOps(mk);
-                }
-                return null;
-            }
-        }, delay, TimeUnit.SECONDS);
-    }
-
     private static MongoConnection createConnection() throws Exception {
         return OakMongoNSRepositoryStub.createConnection(
                 ConcurrentAddNodesClusterIT.class.getSimpleName());