You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/12/08 11:00:56 UTC
svn commit: r1718544 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: mreutegg
Date: Tue Dec 8 10:00:55 2015
New Revision: 1718544
URL: http://svn.apache.org/viewvc?rev=1718544&view=rev
Log:
OAK-3586: ConflictException and CommitQueue should support a list of revisions
Applied patch by Tomek Rekawek with minor changes
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java Tue Dec 8 10:00:55 2015
@@ -20,8 +20,10 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
@@ -55,7 +57,7 @@ final class CommitQueue {
/**
* Map of currently suspended commits until a given Revision is visible.
*/
- private final Map<Semaphore, Revision> suspendedCommits = Maps.newIdentityHashMap();
+ private final Map<Semaphore, SuspendedCommit> suspendedCommits = Maps.newIdentityHashMap();
private final RevisionContext context;
@@ -103,7 +105,7 @@ final class CommitQueue {
}
/**
- * Suspends until one of the following happens:
+ * Suspends until for each of given revisions one of the following happens:
* <ul>
* <li>the given revision is visible from the current headRevision</li>
* <li>the given revision is canceled from the commit queue</li>
@@ -111,26 +113,33 @@ final class CommitQueue {
* <li>the thread is interrupted</li>
* </ul>
*
- * @param r the revision to become visible.
+ * @param conflictRevisions the revisions to become visible.
*/
- void suspendUntil(@Nonnull Revision r) {
+ void suspendUntilAll(@Nonnull Set<Revision> conflictRevisions) {
Comparator<Revision> comparator = context.getRevisionComparator();
- Semaphore s = null;
+ Semaphore s;
+ int addedRevisions;
synchronized (suspendedCommits) {
Revision headRevision = context.getHeadRevision();
- if (comparator.compare(r, headRevision) > 0) {
- s = new Semaphore(0);
- suspendedCommits.put(s, r);
- }
- }
- if (s != null) {
- try {
- s.tryAcquire(suspendTimeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- synchronized (suspendedCommits) {
- suspendedCommits.remove(s);
+ Set<Revision> afterHead = new HashSet<Revision>(conflictRevisions.size());
+ for (Revision r : conflictRevisions) {
+ if (comparator.compare(r, headRevision) > 0) {
+ afterHead.add(r);
}
}
+
+ s = new Semaphore(0);
+ suspendedCommits.put(s, new SuspendedCommit(s, afterHead));
+ addedRevisions = afterHead.size();
+ }
+ try {
+ s.tryAcquire(addedRevisions, suspendTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.debug("The suspended thread has been interrupted", e);
+ } finally {
+ synchronized (suspendedCommits) {
+ suspendedCommits.remove(s);
+ }
}
}
@@ -153,7 +162,7 @@ final class CommitQueue {
/**
* Sets the suspend timeout in milliseconds.
- * See also {@link #suspendUntil(Revision)}.
+ * See also {@link #suspendUntilAll(Set)}.
*
* @param timeout the timeout to set.
*/
@@ -173,15 +182,12 @@ final class CommitQueue {
if (suspendedCommits.isEmpty()) {
return;
}
- Comparator<Revision> comparator = context.getRevisionComparator();
Revision headRevision = context.getHeadRevision();
- Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+ Iterator<SuspendedCommit> it = suspendedCommits.values().iterator();
while (it.hasNext()) {
- Map.Entry<Semaphore, Revision> entry = it.next();
- if (comparator.compare(entry.getValue(), headRevision) <= 0) {
- Semaphore s = entry.getKey();
+ SuspendedCommit suspended = it.next();
+ if (suspended.removeRevisionsYoungerThan(headRevision) && suspended.revisions.isEmpty()) {
it.remove();
- s.release();
}
}
}
@@ -193,13 +199,11 @@ final class CommitQueue {
if (suspendedCommits.isEmpty()) {
return;
}
- Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+ Iterator<SuspendedCommit> it = suspendedCommits.values().iterator();
while (it.hasNext()) {
- Map.Entry<Semaphore, Revision> entry = it.next();
- if (revision.equals(entry.getValue())) {
- Semaphore s = entry.getKey();
+ SuspendedCommit suspended = it.next();
+ if (suspended.removeRevision(revision) && suspended.revisions.isEmpty()) {
it.remove();
- s.release();
}
}
}
@@ -292,4 +296,39 @@ final class CommitQueue {
}
}
}
-}
+
+ private class SuspendedCommit {
+
+ private final Semaphore semaphore;
+
+ private final Set<Revision> revisions;
+
+ private SuspendedCommit(Semaphore semaphore, Set<Revision> revisions) {
+ this.semaphore = semaphore;
+ this.revisions = revisions;
+ }
+
+ private boolean removeRevisionsYoungerThan(Revision revision) {
+ Comparator<Revision> comparator = context.getRevisionComparator();
+ Iterator<Revision> it = revisions.iterator();
+ boolean removed = false;
+ while (it.hasNext()) {
+ if (comparator.compare(it.next(), revision) <= 0) {
+ it.remove();
+ semaphore.release();
+ removed = true;
+ }
+ }
+ return removed;
+ }
+
+ private boolean removeRevision(Revision r) {
+ if (revisions.remove(r)) {
+ semaphore.release();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java Tue Dec 8 10:00:55 2015
@@ -17,13 +17,15 @@
package org.apache.jackrabbit.oak.plugins.document;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
+import java.util.Collections;
+import java.util.Set;
+
/**
* A document store exception with an optional conflict revision. The
* DocumentNodeStore implementation will throw this exception when a commit
@@ -34,18 +36,36 @@ class ConflictException extends Document
private static final long serialVersionUID = 1418838561903727231L;
/**
- * Optional conflict revision.
+ * Optional conflict revisions list.
*/
- private final Revision conflictRevision;
+ private final Set<Revision> conflictRevisions;
/**
* @param message the exception / conflict message.
- * @param conflictRevision the conflict revision or {@code null} if unknown.
+ * @param conflictRevision the conflict revision
*/
ConflictException(@Nonnull String message,
- @Nullable Revision conflictRevision) {
+ @Nonnull Revision conflictRevision) {
+ super(checkNotNull(message));
+ this.conflictRevisions = Collections.singleton(checkNotNull(conflictRevision));
+ }
+
+ /**
+ * @param message the exception / conflict message.
+ * @param conflictRevisions the conflict revision list
+ */
+ ConflictException(@Nonnull String message,
+ @Nonnull Set<Revision> conflictRevisions) {
+ super(checkNotNull(message));
+ this.conflictRevisions = checkNotNull(conflictRevisions);
+ }
+
+ /**
+ * @param message the exception / conflict message.
+ */
+ ConflictException(@Nonnull String message) {
super(checkNotNull(message));
- this.conflictRevision = conflictRevision;
+ this.conflictRevisions = Collections.emptySet();
}
/**
@@ -55,11 +75,21 @@ class ConflictException extends Document
* @return a {@link CommitFailedException}.
*/
CommitFailedException asCommitFailedException() {
- if (conflictRevision != null) {
- return new FailedWithConflictException(conflictRevision, getMessage(), this);
+ if (!conflictRevisions.isEmpty()) {
+ return new FailedWithConflictException(conflictRevisions, getMessage(), this);
} else {
return new CommitFailedException(MERGE, 1,
"Failed to merge changes to the underlying store", this);
}
}
-}
+
+ /**
+ * List of conflict revisions.
+ *
+ * @return a list of conflict revisions (may be empty)
+ */
+ @Nonnull
+ Iterable<Revision> getConflictRevisions() {
+ return conflictRevisions;
+ }
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Dec 8 10:00:55 2015
@@ -43,6 +43,7 @@ import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -1456,21 +1457,28 @@ public final class DocumentNodeStore
}
/**
- * Suspends until the given revision is visible from the current
- * headRevision or the given revision is canceled from the commit queue.
+ * Suspends until all given revisions are either visible from the current
+ * headRevision or canceled from the commit queue.
*
- * The thread will *not* be suspended if the given revision is from a
- * foreign cluster node and async delay is set to zero.
+ * Only revisions from the local cluster node will be considered if the async
+ * delay is set to 0.
*
- * @param r the revision to become visible.
+ * @param conflictRevisions the revision to become visible.
*/
- void suspendUntil(@Nonnull Revision r) {
+ void suspendUntilAll(@Nonnull Set<Revision> conflictRevisions) {
// do not suspend if revision is from another cluster node
// and background read is disabled
- if (r.getClusterId() != getClusterId() && getAsyncDelay() == 0) {
- return;
+ if (getAsyncDelay() == 0) {
+ Set<Revision> onlyLocal = new HashSet<Revision>(conflictRevisions.size());
+ for (Revision r : conflictRevisions) {
+ if (r.getClusterId() == getClusterId()) {
+ onlyLocal.add(r);
+ }
+ }
+ commitQueue.suspendUntilAll(onlyLocal);
+ } else {
+ commitQueue.suspendUntilAll(conflictRevisions);
}
- commitQueue.suspendUntil(r);
}
//------------------------< Observable >------------------------------------
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java Tue Dec 8 10:00:55 2015
@@ -23,6 +23,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.api.CommitFailedException.STATE;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
+import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -149,7 +150,7 @@ class DocumentNodeStoreBranch implements
boolean exclusive)
throws CommitFailedException {
CommitFailedException ex = null;
- Revision conflictRevision = null;
+ Set<Revision> conflictRevisions = new HashSet<Revision>();
long time = System.currentTimeMillis();
int numRetries = 0;
for (long backoff = MIN_BACKOFF; backoff <= maximumBackoff; backoff *= 2) {
@@ -159,14 +160,13 @@ class DocumentNodeStoreBranch implements
final long start = perfLogger.start();
// suspend until conflict revision is visible
// or as a fallback sleep for a while
- if (conflictRevision != null) {
+ if (!conflictRevisions.isEmpty()) {
// suspend until conflicting revision is visible
LOG.debug("Suspending until {} is visible. Current head {}.",
- conflictRevision, store.getHeadRevision());
- store.suspendUntil(conflictRevision);
+ conflictRevisions, store.getHeadRevision());
+ store.suspendUntilAll(conflictRevisions);
+ conflictRevisions.clear();
LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
- // reset conflict revision
- conflictRevision = null;
} else {
Thread.sleep(backoff + RANDOM.nextInt((int) Math.min(backoff, Integer.MAX_VALUE)));
}
@@ -181,7 +181,7 @@ class DocumentNodeStoreBranch implements
checkNotNull(info), exclusive);
} catch (FailedWithConflictException e) {
ex = e;
- conflictRevision = e.getConflictRevision();
+ conflictRevisions.addAll(e.getConflictRevisions());
} catch (CommitFailedException e) {
ex = e;
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java Tue Dec 8 10:00:55 2015
@@ -22,6 +22,8 @@ import org.apache.jackrabbit.oak.api.Com
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Set;
+
/**
* A {@link CommitFailedException} with a conflict revision.
*/
@@ -29,20 +31,20 @@ class FailedWithConflictException extend
private static final long serialVersionUID = 2716279884065949789L;
- private final Revision conflictRevision;
+ private final Set<Revision> conflictRevisions;
- FailedWithConflictException(@Nonnull Revision conflictRevision,
+ FailedWithConflictException(@Nonnull Set<Revision> conflictRevisions,
@Nonnull String message,
@Nonnull Throwable cause) {
super(OAK, MERGE, 4, checkNotNull(message), checkNotNull(cause));
- this.conflictRevision = checkNotNull(conflictRevision);
+ this.conflictRevisions = checkNotNull(conflictRevisions);
}
/**
* @return the revision of another commit which caused a conflict.
*/
@Nonnull
- Revision getConflictRevision() {
- return conflictRevision;
+ Set<Revision> getConflictRevisions() {
+ return conflictRevisions;
}
-}
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java Tue Dec 8 10:00:55 2015
@@ -18,8 +18,11 @@ package org.apache.jackrabbit.oak.plugin
import java.io.Closeable;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -37,6 +40,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Sets.union;
import static java.util.Collections.synchronizedList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -216,13 +221,15 @@ public class CommitQueueTest {
}
};
headRevision.set(context.newRevision());
+
final CommitQueue queue = new CommitQueue(context);
- final Revision r = context.newRevision();
+ final Revision newHeadRev = context.newRevision();
+ final Set<Revision> revisions = queue.createRevisions(10);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
- queue.suspendUntil(r);
+ queue.suspendUntilAll(union(of(newHeadRev), revisions));
}
});
t.start();
@@ -240,8 +247,14 @@ public class CommitQueueTest {
// must still be suspended
assertEquals(1, queue.numSuspendedThreads());
- headRevision.set(r);
+ headRevision.set(newHeadRev);
queue.headRevisionChanged();
+ // must still be suspended
+ assertEquals(1, queue.numSuspendedThreads());
+
+ for (Revision rev : revisions) {
+ queue.canceled(rev);
+ }
// must not be suspended anymore
assertEquals(0, queue.numSuspendedThreads());
}
@@ -264,7 +277,7 @@ public class CommitQueueTest {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
- queue.suspendUntil(r);
+ queue.suspendUntilAll(of(r));
}
});
t.start();
@@ -273,6 +286,66 @@ public class CommitQueueTest {
assertFalse(t.isAlive());
}
+ @Test
+ public void concurrentSuspendUntil() throws Exception {
+ final AtomicReference<Revision> headRevision = new AtomicReference<Revision>();
+ RevisionContext context = new DummyRevisionContext() {
+ @Nonnull
+ @Override
+ public Revision getHeadRevision() {
+ return headRevision.get();
+ }
+ };
+ headRevision.set(context.newRevision());
+
+ List<Thread> threads = new ArrayList<Thread>();
+ List<Revision> allRevisions = new ArrayList<Revision>();
+
+ final CommitQueue queue = new CommitQueue(context);
+ for (int i = 0; i < 10; i++) { // threads count
+ final Set<Revision> revisions = new HashSet<Revision>();
+ for (int j = 0; j < 10; j++) { // revisions per thread
+ Revision r = queue.createRevision();
+ revisions.add(r);
+ allRevisions.add(r);
+ }
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ queue.suspendUntilAll(revisions);
+ }
+ });
+ threads.add(t);
+ t.start();
+ }
+
+ for (int i = 0; i < 100; i++) {
+ if (queue.numSuspendedThreads() == 10) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ assertEquals(10, queue.numSuspendedThreads());
+
+ Collections.shuffle(allRevisions);
+ for (Revision r : allRevisions) {
+ queue.canceled(r);
+ Thread.sleep(10);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ if (queue.numSuspendedThreads() == 0) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ assertEquals(0, queue.numSuspendedThreads());
+
+ for (Thread t : threads) {
+ t.join(1000);
+ assertFalse(t.isAlive());
+ }
+ }
+
private void assertNoExceptions() throws Exception {
if (!exceptions.isEmpty()) {
throw exceptions.get(0);
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java Tue Dec 8 10:00:55 2015
@@ -23,18 +23,20 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import java.util.Collections;
+
public class ConflictExceptionTest {
@Test
public void type() {
- ConflictException e = new ConflictException("conflict", null);
+ ConflictException e = new ConflictException("conflict");
CommitFailedException cfe = e.asCommitFailedException();
assertEquals(CommitFailedException.MERGE, cfe.getType());
}
@Test
public void cause() {
- ConflictException e = new ConflictException("conflict", null);
+ ConflictException e = new ConflictException("conflict");
CommitFailedException cfe = e.asCommitFailedException();
assertSame(e, cfe.getCause());
}
@@ -47,6 +49,6 @@ public class ConflictExceptionTest {
assertTrue(cfe instanceof FailedWithConflictException);
FailedWithConflictException fwce = (FailedWithConflictException) cfe;
assertEquals(CommitFailedException.MERGE, fwce.getType());
- assertEquals(r, fwce.getConflictRevision());
+ assertEquals(Collections.singleton(r), fwce.getConflictRevisions());
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1718544&r1=1718543&r2=1718544&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Tue Dec 8 10:00:55 2015
@@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -2380,6 +2381,73 @@ public class DocumentNodeStoreTest {
mergeAttempts.get() <= 1);
}
+ // OAK-3586
+ @Test
+ public void resolveMultipleConflictedRevisions() throws Exception {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ final DocumentNodeStore ds = builderProvider.newBuilder()
+ .setDocumentStore(store)
+ .setAsyncDelay(0).getNodeStore();
+
+ DocumentNodeState root = ds.getRoot();
+ final DocumentNodeStoreBranch b = ds.createBranch(root);
+
+ NodeBuilder builder = root.builder();
+ builder.child("foo");
+ b.setRoot(builder.getNodeState());
+
+ final Set<Revision> revisions = new HashSet<Revision>();
+ final List<Commit> commits = new ArrayList<Commit>();
+ for (int i = 0; i < 10; i++) {
+ Revision revision = ds.newRevision();
+ Commit commit = ds.newCommit(revision, ds.createBranch(root));
+ commits.add(commit);
+ revisions.add(revision);
+ }
+
+ final AtomicBoolean merged = new AtomicBoolean();
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ CommitFailedException exception = new ConflictException("Can't merge", revisions).asCommitFailedException();
+ b.merge(new HookFailingOnce(exception), CommitInfo.EMPTY);
+ merged.set(true);
+ } catch (CommitFailedException e) {
+ LOG.error("Can't commit", e);
+ }
+ }
+ });
+ t.start();
+
+ // 6 x done()
+ for (int i = 0; i < 6; i++) {
+ assertFalse("The branch can't be merged yet", merged.get());
+ ds.done(commits.get(i), false, CommitInfo.EMPTY);
+ }
+
+ // 2 x cancel()
+ for (int i = 6; i < 8; i++) {
+ assertFalse("The branch can't be merged yet", merged.get());
+ ds.canceled(commits.get(i));
+ }
+
+ // 2 x branch done()
+ for (int i = 8; i < 10; i++) {
+ assertFalse("The branch can't be merged yet", merged.get());
+ ds.done(commits.get(i), true, CommitInfo.EMPTY);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ if (merged.get()) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ assertTrue("The branch should be merged by now", merged.get());
+
+ t.join();
+ }
+
// OAK-3411
@Test
public void sameSeenAtRevision() throws Exception {
@@ -2599,6 +2667,28 @@ public class DocumentNodeStoreTest {
}
};
+ private static class HookFailingOnce implements CommitHook {
+
+ private final AtomicBoolean failedAlready = new AtomicBoolean();
+
+ private final CommitFailedException exception;
+
+ private HookFailingOnce(CommitFailedException exception) {
+ this.exception = exception;
+ }
+
+ @Override
+ public NodeState processCommit(NodeState before, NodeState after, CommitInfo info)
+ throws CommitFailedException {
+ if (failedAlready.getAndSet(true)) {
+ return after;
+ } else {
+ throw exception;
+ }
+ }
+
+ }
+
private static class TestEditor extends DefaultEditor {
private final NodeBuilder builder;