You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/12/08 11:00:56 UTC

svn commit: r1718544 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: mreutegg
Date: Tue Dec  8 10:00:55 2015
New Revision: 1718544

URL: http://svn.apache.org/viewvc?rev=1718544&view=rev
Log:
OAK-3586: ConflictException and CommitQueue should support a list of revisions

Applied patch by Tomek Rekawek with minor changes

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java Tue Dec  8 10:00:55 2015
@@ -20,8 +20,10 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -55,7 +57,7 @@ final class CommitQueue {
     /**
      * Map of currently suspended commits until a given Revision is visible.
      */
-    private final Map<Semaphore, Revision> suspendedCommits = Maps.newIdentityHashMap();
+    private final Map<Semaphore, SuspendedCommit> suspendedCommits = Maps.newIdentityHashMap();
 
     private final RevisionContext context;
 
@@ -103,7 +105,7 @@ final class CommitQueue {
     }
 
     /**
-     * Suspends until one of the following happens:
+     * Suspends until for each of given revisions one of the following happens:
      * <ul>
      *     <li>the given revision is visible from the current headRevision</li>
      *     <li>the given revision is canceled from the commit queue</li>
@@ -111,26 +113,33 @@ final class CommitQueue {
      *     <li>the thread is interrupted</li>
      * </ul>
      *
-     * @param r the revision to become visible.
+     * @param conflictRevisions the revisions to become visible.
      */
-    void suspendUntil(@Nonnull Revision r) {
+    void suspendUntilAll(@Nonnull Set<Revision> conflictRevisions) {
         Comparator<Revision> comparator = context.getRevisionComparator();
-        Semaphore s = null;
+        Semaphore s;
+        int addedRevisions;
         synchronized (suspendedCommits) {
             Revision headRevision = context.getHeadRevision();
-            if (comparator.compare(r, headRevision) > 0) {
-                s = new Semaphore(0);
-                suspendedCommits.put(s, r);
-            }
-        }
-        if (s != null) {
-            try {
-                s.tryAcquire(suspendTimeout, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                synchronized (suspendedCommits) {
-                    suspendedCommits.remove(s);
+            Set<Revision> afterHead = new HashSet<Revision>(conflictRevisions.size());
+            for (Revision r : conflictRevisions) {
+                if (comparator.compare(r, headRevision) > 0) {
+                    afterHead.add(r);
                 }
             }
+
+            s = new Semaphore(0);
+            suspendedCommits.put(s, new SuspendedCommit(s, afterHead));
+            addedRevisions = afterHead.size();
+        }
+        try {
+            s.tryAcquire(addedRevisions, suspendTimeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOG.debug("The suspended thread has been interrupted", e);
+        } finally {
+            synchronized (suspendedCommits) {
+                suspendedCommits.remove(s);
+            }
         }
     }
 
@@ -153,7 +162,7 @@ final class CommitQueue {
 
     /**
      * Sets the suspend timeout in milliseconds.
-     * See also {@link #suspendUntil(Revision)}.
+     * See also {@link #suspendUntilAll(Set)}.
      *
      * @param timeout the timeout to set.
      */
@@ -173,15 +182,12 @@ final class CommitQueue {
             if (suspendedCommits.isEmpty()) {
                 return;
             }
-            Comparator<Revision> comparator = context.getRevisionComparator();
             Revision headRevision = context.getHeadRevision();
-            Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+            Iterator<SuspendedCommit> it = suspendedCommits.values().iterator();
             while (it.hasNext()) {
-                Map.Entry<Semaphore, Revision> entry = it.next();
-                if (comparator.compare(entry.getValue(), headRevision) <= 0) {
-                    Semaphore s = entry.getKey();
+                SuspendedCommit suspended = it.next();
+                if (suspended.removeRevisionsYoungerThan(headRevision) && suspended.revisions.isEmpty()) {
                     it.remove();
-                    s.release();
                 }
             }
         }
@@ -193,13 +199,11 @@ final class CommitQueue {
             if (suspendedCommits.isEmpty()) {
                 return;
             }
-            Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+            Iterator<SuspendedCommit> it = suspendedCommits.values().iterator();
             while (it.hasNext()) {
-                Map.Entry<Semaphore, Revision> entry = it.next();
-                if (revision.equals(entry.getValue())) {
-                    Semaphore s = entry.getKey();
+                SuspendedCommit suspended = it.next();
+                if (suspended.removeRevision(revision) && suspended.revisions.isEmpty()) {
                     it.remove();
-                    s.release();
                 }
             }
         }
@@ -292,4 +296,39 @@ final class CommitQueue {
             }
         }
     }
-}
+
+    private class SuspendedCommit {
+
+        private final Semaphore semaphore;
+
+        private final Set<Revision> revisions;
+
+        private SuspendedCommit(Semaphore semaphore, Set<Revision> revisions) {
+            this.semaphore = semaphore;
+            this.revisions = revisions;
+        }
+
+        private boolean removeRevisionsYoungerThan(Revision revision) {
+            Comparator<Revision> comparator = context.getRevisionComparator();
+            Iterator<Revision> it = revisions.iterator();
+            boolean removed = false;
+            while (it.hasNext()) {
+                if (comparator.compare(it.next(), revision) <= 0) {
+                    it.remove();
+                    semaphore.release();
+                    removed = true;
+                }
+            }
+            return removed;
+        }
+
+        private boolean removeRevision(Revision r) {
+            if (revisions.remove(r)) {
+                semaphore.release();
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java Tue Dec  8 10:00:55 2015
@@ -17,13 +17,15 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
 
+import java.util.Collections;
+import java.util.Set;
+
 /**
  * A document store exception with an optional conflict revision. The
  * DocumentNodeStore implementation will throw this exception when a commit
@@ -34,18 +36,36 @@ class ConflictException extends Document
     private static final long serialVersionUID = 1418838561903727231L;
 
     /**
-     * Optional conflict revision.
+     * Optional conflict revisions list.
      */
-    private final Revision conflictRevision;
+    private final Set<Revision> conflictRevisions;
 
     /**
      * @param message the exception / conflict message.
-     * @param conflictRevision the conflict revision or {@code null} if unknown.
+     * @param conflictRevision the conflict revision
      */
     ConflictException(@Nonnull String message,
-                      @Nullable Revision conflictRevision) {
+                      @Nonnull Revision conflictRevision) {
+        super(checkNotNull(message));
+        this.conflictRevisions = Collections.singleton(checkNotNull(conflictRevision));
+    }
+
+    /**
+     * @param message the exception / conflict message.
+     * @param conflictRevisions the conflict revision list
+     */
+    ConflictException(@Nonnull String message,
+                      @Nonnull Set<Revision> conflictRevisions) {
+        super(checkNotNull(message));
+        this.conflictRevisions = checkNotNull(conflictRevisions);
+    }
+
+    /**
+     * @param message the exception / conflict message.
+     */
+    ConflictException(@Nonnull String message) {
         super(checkNotNull(message));
-        this.conflictRevision = conflictRevision;
+        this.conflictRevisions = Collections.emptySet();
     }
 
     /**
@@ -55,11 +75,21 @@ class ConflictException extends Document
      * @return a {@link CommitFailedException}.
      */
     CommitFailedException asCommitFailedException() {
-        if (conflictRevision != null) {
-            return new FailedWithConflictException(conflictRevision, getMessage(), this);
+        if (!conflictRevisions.isEmpty()) {
+            return new FailedWithConflictException(conflictRevisions, getMessage(), this);
         } else {
             return new CommitFailedException(MERGE, 1,
                     "Failed to merge changes to the underlying store", this);
         }
     }
-}
+
+    /**
+     * List of conflict revisions.
+     * 
+     * @return a list of conflict revisions (may be empty)
+     */
+    @Nonnull
+    Iterable<Revision> getConflictRevisions() {
+        return conflictRevisions;
+    }
+}
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Dec  8 10:00:55 2015
@@ -43,6 +43,7 @@ import java.text.SimpleDateFormat;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1456,21 +1457,28 @@ public final class DocumentNodeStore
     }
 
     /**
-     * Suspends until the given revision is visible from the current
-     * headRevision or the given revision is canceled from the commit queue.
+     * Suspends until all given revisions are either visible from the current
+     * headRevision or canceled from the commit queue.
      *
-     * The thread will *not* be suspended if the given revision is from a
-     * foreign cluster node and async delay is set to zero.
+     * Only revisions from the local cluster node will be considered if the async
+     * delay is set to 0.
      *
-     * @param r the revision to become visible.
+     * @param conflictRevisions the revision to become visible.
      */
-    void suspendUntil(@Nonnull Revision r) {
+    void suspendUntilAll(@Nonnull Set<Revision> conflictRevisions) {
         // do not suspend if revision is from another cluster node
         // and background read is disabled
-        if (r.getClusterId() != getClusterId() && getAsyncDelay() == 0) {
-            return;
+        if (getAsyncDelay() == 0) {
+            Set<Revision> onlyLocal = new HashSet<Revision>(conflictRevisions.size());
+            for (Revision r : conflictRevisions) {
+                if (r.getClusterId() == getClusterId()) {
+                    onlyLocal.add(r);
+                }
+            }
+            commitQueue.suspendUntilAll(onlyLocal);
+        } else {
+            commitQueue.suspendUntilAll(conflictRevisions);
         }
-        commitQueue.suspendUntil(r);
     }
 
     //------------------------< Observable >------------------------------------

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java Tue Dec  8 10:00:55 2015
@@ -23,6 +23,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.api.CommitFailedException.STATE;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
 
+import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -149,7 +150,7 @@ class DocumentNodeStoreBranch implements
                              boolean exclusive)
             throws CommitFailedException {
         CommitFailedException ex = null;
-        Revision conflictRevision = null;
+        Set<Revision> conflictRevisions = new HashSet<Revision>();
         long time = System.currentTimeMillis();
         int numRetries = 0;
         for (long backoff = MIN_BACKOFF; backoff <= maximumBackoff; backoff *= 2) {
@@ -159,14 +160,13 @@ class DocumentNodeStoreBranch implements
                     final long start = perfLogger.start();
                     // suspend until conflict revision is visible
                     // or as a fallback sleep for a while
-                    if (conflictRevision != null) {
+                    if (!conflictRevisions.isEmpty()) {
                         // suspend until conflicting revision is visible
                         LOG.debug("Suspending until {} is visible. Current head {}.",
-                                conflictRevision, store.getHeadRevision());
-                        store.suspendUntil(conflictRevision);
+                                conflictRevisions, store.getHeadRevision());
+                        store.suspendUntilAll(conflictRevisions);
+                        conflictRevisions.clear();
                         LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
-                        // reset conflict revision
-                        conflictRevision = null;
                     } else {
                         Thread.sleep(backoff + RANDOM.nextInt((int) Math.min(backoff, Integer.MAX_VALUE)));
                     }
@@ -181,7 +181,7 @@ class DocumentNodeStoreBranch implements
                         checkNotNull(info), exclusive);
             } catch (FailedWithConflictException e) {
                 ex = e;
-                conflictRevision = e.getConflictRevision();
+                conflictRevisions.addAll(e.getConflictRevisions());
             } catch (CommitFailedException e) {
                 ex = e;
             }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java Tue Dec  8 10:00:55 2015
@@ -22,6 +22,8 @@ import org.apache.jackrabbit.oak.api.Com
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Set;
+
 /**
  * A {@link CommitFailedException} with a conflict revision.
  */
@@ -29,20 +31,20 @@ class FailedWithConflictException extend
 
     private static final long serialVersionUID = 2716279884065949789L;
 
-    private final Revision conflictRevision;
+    private final Set<Revision> conflictRevisions;
 
-    FailedWithConflictException(@Nonnull Revision conflictRevision,
+    FailedWithConflictException(@Nonnull Set<Revision> conflictRevisions,
                                 @Nonnull String message,
                                 @Nonnull Throwable cause) {
         super(OAK, MERGE, 4, checkNotNull(message), checkNotNull(cause));
-        this.conflictRevision = checkNotNull(conflictRevision);
+        this.conflictRevisions = checkNotNull(conflictRevisions);
     }
 
     /**
      * @return the revision of another commit which caused a conflict.
      */
     @Nonnull
-    Revision getConflictRevision() {
-        return conflictRevision;
+    Set<Revision> getConflictRevisions() {
+        return conflictRevisions;
     }
-}
+}
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java Tue Dec  8 10:00:55 2015
@@ -18,8 +18,11 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -37,6 +40,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Sets.union;
 import static java.util.Collections.synchronizedList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -216,13 +221,15 @@ public class CommitQueueTest {
             }
         };
         headRevision.set(context.newRevision());
+
         final CommitQueue queue = new CommitQueue(context);
 
-        final Revision r = context.newRevision();
+        final Revision newHeadRev = context.newRevision();
+        final Set<Revision> revisions = queue.createRevisions(10);
         Thread t = new Thread(new Runnable() {
             @Override
             public void run() {
-                queue.suspendUntil(r);
+                queue.suspendUntilAll(union(of(newHeadRev), revisions));
             }
         });
         t.start();
@@ -240,8 +247,14 @@ public class CommitQueueTest {
         // must still be suspended
         assertEquals(1, queue.numSuspendedThreads());
 
-        headRevision.set(r);
+        headRevision.set(newHeadRev);
         queue.headRevisionChanged();
+        // must still be suspended
+        assertEquals(1, queue.numSuspendedThreads());
+
+        for (Revision rev : revisions) {
+            queue.canceled(rev);
+        }
         // must not be suspended anymore
         assertEquals(0, queue.numSuspendedThreads());
     }
@@ -264,7 +277,7 @@ public class CommitQueueTest {
         Thread t = new Thread(new Runnable() {
             @Override
             public void run() {
-                queue.suspendUntil(r);
+                queue.suspendUntilAll(of(r));
             }
         });
         t.start();
@@ -273,6 +286,66 @@ public class CommitQueueTest {
         assertFalse(t.isAlive());
     }
 
+    @Test
+    public void concurrentSuspendUntil() throws Exception {
+        final AtomicReference<Revision> headRevision = new AtomicReference<Revision>();
+        RevisionContext context = new DummyRevisionContext() {
+            @Nonnull
+            @Override
+            public Revision getHeadRevision() {
+                return headRevision.get();
+            }
+        };
+        headRevision.set(context.newRevision());
+
+        List<Thread> threads = new ArrayList<Thread>();
+        List<Revision> allRevisions = new ArrayList<Revision>();
+
+        final CommitQueue queue = new CommitQueue(context);
+        for (int i = 0; i < 10; i++) { // threads count
+            final Set<Revision> revisions = new HashSet<Revision>();
+            for (int j = 0; j < 10; j++) { // revisions per thread
+                Revision r = queue.createRevision();
+                revisions.add(r);
+                allRevisions.add(r);
+            }
+            Thread t = new Thread(new Runnable() {
+                public void run() {
+                    queue.suspendUntilAll(revisions);
+                }
+            });
+            threads.add(t);
+            t.start();
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (queue.numSuspendedThreads() == 10) {
+                break;
+            }
+            Thread.sleep(10);
+        }
+        assertEquals(10, queue.numSuspendedThreads());
+
+        Collections.shuffle(allRevisions);
+        for (Revision r : allRevisions) {
+            queue.canceled(r);
+            Thread.sleep(10);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (queue.numSuspendedThreads() == 0) {
+                break;
+            }
+            Thread.sleep(10);
+        }
+        assertEquals(0, queue.numSuspendedThreads());
+
+        for (Thread t : threads) {
+            t.join(1000);
+            assertFalse(t.isAlive());
+        }
+    }
+
     private void assertNoExceptions() throws Exception {
         if (!exceptions.isEmpty()) {
             throw exceptions.get(0);

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java Tue Dec  8 10:00:55 2015
@@ -23,18 +23,20 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collections;
+
 public class ConflictExceptionTest {
 
     @Test
     public void type() {
-        ConflictException e = new ConflictException("conflict", null);
+        ConflictException e = new ConflictException("conflict");
         CommitFailedException cfe = e.asCommitFailedException();
         assertEquals(CommitFailedException.MERGE, cfe.getType());
     }
 
     @Test
     public void cause() {
-        ConflictException e = new ConflictException("conflict", null);
+        ConflictException e = new ConflictException("conflict");
         CommitFailedException cfe = e.asCommitFailedException();
         assertSame(e, cfe.getCause());
     }
@@ -47,6 +49,6 @@ public class ConflictExceptionTest {
         assertTrue(cfe instanceof FailedWithConflictException);
         FailedWithConflictException fwce = (FailedWithConflictException) cfe;
         assertEquals(CommitFailedException.MERGE, fwce.getType());
-        assertEquals(r, fwce.getConflictRevision());
+        assertEquals(Collections.singleton(r), fwce.getConflictRevisions());
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Tue Dec  8 10:00:55 2015
@@ -42,6 +42,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -2380,6 +2381,73 @@ public class DocumentNodeStoreTest {
                 mergeAttempts.get() <= 1);
     }
 
+    // OAK-3586
+    @Test
+    public void resolveMultipleConflictedRevisions() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        final DocumentNodeStore ds = builderProvider.newBuilder()
+                .setDocumentStore(store)
+                .setAsyncDelay(0).getNodeStore();
+
+        DocumentNodeState root = ds.getRoot();
+        final DocumentNodeStoreBranch b = ds.createBranch(root);
+
+        NodeBuilder builder = root.builder();
+        builder.child("foo");
+        b.setRoot(builder.getNodeState());
+
+        final Set<Revision> revisions = new HashSet<Revision>();
+        final List<Commit> commits = new ArrayList<Commit>();
+        for (int i = 0; i < 10; i++) {
+            Revision revision = ds.newRevision();
+            Commit commit = ds.newCommit(revision, ds.createBranch(root));
+            commits.add(commit);
+            revisions.add(revision);
+        }
+
+        final AtomicBoolean merged = new AtomicBoolean();
+        Thread t = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    CommitFailedException exception = new ConflictException("Can't merge", revisions).asCommitFailedException();
+                    b.merge(new HookFailingOnce(exception), CommitInfo.EMPTY);
+                    merged.set(true);
+                } catch (CommitFailedException e) {
+                    LOG.error("Can't commit", e);
+                }
+            }
+        });
+        t.start();
+
+        // 6 x done()
+        for (int i = 0; i < 6; i++) {
+            assertFalse("The branch can't be merged yet", merged.get());
+            ds.done(commits.get(i), false, CommitInfo.EMPTY);
+        }
+
+        // 2 x cancel()
+        for (int i = 6; i < 8; i++) {
+            assertFalse("The branch can't be merged yet", merged.get());
+            ds.canceled(commits.get(i));
+        }
+
+        // 2 x branch done()
+        for (int i = 8; i < 10; i++) {
+            assertFalse("The branch can't be merged yet", merged.get());
+            ds.done(commits.get(i), true, CommitInfo.EMPTY);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (merged.get()) {
+                break;
+            }
+            Thread.sleep(10);
+        }
+        assertTrue("The branch should be merged by now", merged.get());
+
+        t.join();
+    }
+
     // OAK-3411
     @Test
     public void sameSeenAtRevision() throws Exception {
@@ -2599,6 +2667,28 @@ public class DocumentNodeStoreTest {
         }
     };
 
+    private static class HookFailingOnce implements CommitHook {
+
+        private final AtomicBoolean failedAlready = new AtomicBoolean();
+
+        private final CommitFailedException exception;
+
+        private HookFailingOnce(CommitFailedException exception) {
+            this.exception = exception;
+        }
+
+        @Override
+        public NodeState processCommit(NodeState before, NodeState after, CommitInfo info)
+                throws CommitFailedException {
+            if (failedAlready.getAndSet(true)) {
+                return after;
+            } else {
+                throw exception;
+            }
+        }
+
+    }
+
     private static class TestEditor extends DefaultEditor {
 
         private final NodeBuilder builder;