You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/09/08 10:03:25 UTC
svn commit: r1701741 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/
oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/
Author: mreutegg
Date: Tue Sep 8 08:03:25 2015
New Revision: 1701741
URL: http://svn.apache.org/r1701741
Log:
OAK-3042: Suspend commit on conflict
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java Tue Sep 8 08:03:25 2015
@@ -72,14 +72,14 @@ class Collision {
@Nonnull
Revision mark(DocumentStore store) throws DocumentStoreException {
// first try to mark their revision
- if (markCommitRoot(document, theirRev, store)) {
+ if (markCommitRoot(document, theirRev, ourRev, store)) {
return theirRev;
}
// their commit wins, we have to mark ourRev
NodeDocument newDoc = Collection.NODES.newDocument(store);
document.deepCopy(newDoc);
UpdateUtils.applyChanges(newDoc, ourOp, context.getRevisionComparator());
- if (!markCommitRoot(newDoc, ourRev, store)) {
+ if (!markCommitRoot(newDoc, ourRev, theirRev, store)) {
throw new IllegalStateException("Unable to annotate our revision "
+ "with collision marker. Our revision: " + ourRev
+ ", document:\n" + newDoc.format());
@@ -94,12 +94,14 @@ class Collision {
* @param document the document.
* @param revision the revision of the commit to annotated with a collision
* marker.
+ * @param other the revision which detected the collision.
* @param store the document store.
* @return <code>true</code> if the commit for the given revision was marked
* successfully; <code>false</code> otherwise.
*/
private static boolean markCommitRoot(@Nonnull NodeDocument document,
@Nonnull Revision revision,
+ @Nonnull Revision other,
@Nonnull DocumentStore store) {
String p = document.getPath();
String commitRootPath;
@@ -131,7 +133,7 @@ class Collision {
// already marked
return true;
}
- NodeDocument.addCollision(op, revision);
+ NodeDocument.addCollision(op, revision, other);
String commitValue = commitRoot.getLocalRevisions().get(revision);
if (commitValue == null) {
// no revision entry yet
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java Tue Sep 8 08:03:25 2015
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -393,7 +394,16 @@ public class Commit {
if (before == null) {
String msg = "Conflicting concurrent change. " +
"Update operation failed: " + commitRoot;
- throw new DocumentStoreException(msg);
+ NodeDocument commitRootDoc = store.find(NODES, commitRoot.getId());
+ DocumentStoreException dse;
+ if (commitRootDoc == null) {
+ dse = new DocumentStoreException(msg);
+ } else {
+ dse = new ConflictException(msg,
+ commitRootDoc.getMostRecentConflictFor(
+ Collections.singleton(revision), nodeStore));
+ }
+ throw dse;
} else {
success = true;
// if we get here the commit was successful and
@@ -490,9 +500,12 @@ public class Commit {
* @param op the update operation.
* @param before how the document looked before the update was applied or
* {@code null} if it didn't exist before.
+ * @throws ConflictException if there was a conflict introduced by the
+ * given update operation.
*/
private void checkConflicts(@Nonnull UpdateOp op,
- @Nullable NodeDocument before) {
+ @Nullable NodeDocument before)
+ throws ConflictException {
DocumentStore store = nodeStore.getDocumentStore();
collisions.clear();
if (baseRevision != null) {
@@ -507,10 +520,14 @@ public class Commit {
});
}
String conflictMessage = null;
+ Revision conflictRevision = newestRev;
if (newestRev == null) {
if ((op.isDelete() || !op.isNew()) && isConflicting(before, op)) {
conflictMessage = "The node " +
op.getId() + " does not exist or is already deleted";
+ if (before != null && !before.getLocalDeleted().isEmpty()) {
+ conflictRevision = before.getLocalDeleted().firstKey();
+ }
}
} else {
if (op.isNew() && isConflicting(before, op)) {
@@ -545,6 +562,7 @@ public class Commit {
op.getId() + " was changed in revision\n" + r +
", which was applied after the base revision\n" +
baseRevision;
+ conflictRevision = r;
}
}
}
@@ -558,7 +576,7 @@ public class Commit {
",\nrevision order:\n" +
nodeStore.getRevisionComparator());
}
- throw new DocumentStoreException(conflictMessage);
+ throw new ConflictException(conflictMessage, conflictRevision);
}
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java Tue Sep 8 08:03:25 2015
@@ -19,14 +19,21 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import com.google.common.collect.Maps;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,13 +41,22 @@ import org.slf4j.LoggerFactory;
* <code>CommitQueue</code> ensures a sequence of commits consistent with the
* commit revision even if commits did not complete in this sequence.
*/
-abstract class CommitQueue {
+final class CommitQueue {
static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class);
private final SortedMap<Revision, Entry> commits = new TreeMap<Revision, Entry>(StableRevisionComparator.INSTANCE);
- protected abstract Revision newRevision();
+ /**
+ * Map of currently suspended commits until a given Revision is visible.
+ */
+ private final Map<Semaphore, Revision> suspendedCommits = Maps.newIdentityHashMap();
+
+ private final RevisionContext context;
+
+ CommitQueue(@Nonnull RevisionContext context) {
+ this.context = checkNotNull(context);
+ }
@Nonnull
Revision createRevision() {
@@ -54,7 +70,7 @@ abstract class CommitQueue {
Revision rev = null;
synchronized (this) {
for (int i = 0; i < num; i++) {
- rev = newRevision();
+ rev = context.newRevision();
revs.add(rev);
}
commits.put(rev, new Entry(rev));
@@ -70,6 +86,7 @@ abstract class CommitQueue {
void canceled(@Nonnull Revision rev) {
removeCommit(rev);
+ notifySuspendedCommits(rev);
}
boolean contains(@Nonnull Revision revision) {
@@ -78,6 +95,44 @@ abstract class CommitQueue {
}
}
+ /**
+ * Suspends until the given revision is visible from the current
+ * headRevision or the given revision is canceled from the commit queue.
+ *
+ * @param r the revision to become visible.
+ */
+ void suspendUntil(@Nonnull Revision r) {
+ Comparator<Revision> comparator = context.getRevisionComparator();
+ Semaphore s = null;
+ synchronized (suspendedCommits) {
+ Revision headRevision = context.getHeadRevision();
+ if (comparator.compare(r, headRevision) > 0) {
+ s = new Semaphore(0);
+ suspendedCommits.put(s, r);
+ }
+ }
+ if (s != null) {
+ s.acquireUninterruptibly();
+ }
+ }
+
+ /**
+ * Called when the head revision accessible via the {@link RevisionContext}
+ * passed to constructor changed.
+ */
+ void headRevisionChanged() {
+ notifySuspendedCommits();
+ }
+
+ /**
+ * @return the number of suspended threads on this commit queue.
+ */
+ int numSuspendedThreads() {
+ synchronized (suspendedCommits) {
+ return suspendedCommits.size();
+ }
+ }
+
interface Callback {
void headOfQueue(@Nonnull Revision revision);
@@ -85,6 +140,43 @@ abstract class CommitQueue {
//------------------------< internal >--------------------------------------
+ private void notifySuspendedCommits() {
+ synchronized (suspendedCommits) {
+ if (suspendedCommits.isEmpty()) {
+ return;
+ }
+ Comparator<Revision> comparator = context.getRevisionComparator();
+ Revision headRevision = context.getHeadRevision();
+ Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Semaphore, Revision> entry = it.next();
+ if (comparator.compare(entry.getValue(), headRevision) <= 0) {
+ Semaphore s = entry.getKey();
+ it.remove();
+ s.release();
+ }
+ }
+ }
+ }
+
+ private void notifySuspendedCommits(@Nonnull Revision revision) {
+ checkNotNull(revision);
+ synchronized (suspendedCommits) {
+ if (suspendedCommits.isEmpty()) {
+ return;
+ }
+ Iterator<Map.Entry<Semaphore, Revision>> it = suspendedCommits.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Semaphore, Revision> entry = it.next();
+ if (revision.equals(entry.getValue())) {
+ Semaphore s = entry.getKey();
+ it.remove();
+ s.release();
+ }
+ }
+ }
+ }
+
private void removeCommit(@Nonnull Revision rev) {
// simply remove and notify next head if any
synchronized (this) {
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java Tue Sep 8 08:03:25 2015
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
+
+/**
+ * A document store exception with an optional conflict revision. The
+ * DocumentNodeStore implementation will throw this exception when a commit
+ * or merge fails with a conflict.
+ */
+class ConflictException extends DocumentStoreException {
+
+ private static final long serialVersionUID = 1418838561903727231L;
+
+ /**
+ * Optional conflict revision.
+ */
+ private final Revision conflictRevision;
+
+ /**
+ * @param message the exception / conflict message.
+ * @param conflictRevision the conflict revision or {@code null} if unknown.
+ */
+ ConflictException(@Nonnull String message,
+ @Nullable Revision conflictRevision) {
+ super(checkNotNull(message));
+ this.conflictRevision = conflictRevision;
+ }
+
+ /**
+ * Convert this exception into a {@link CommitFailedException}. This
+ * exception will be set as the cause of the returned exception.
+ *
+ * @return a {@link CommitFailedException}.
+ */
+ CommitFailedException asCommitFailedException() {
+ if (conflictRevision != null) {
+ return new FailedWithConflictException(conflictRevision, getMessage(), this);
+ } else {
+ return new CommitFailedException(MERGE, 1,
+ "Failed to merge changes to the underlying store", this);
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Sep 8 08:03:25 2015
@@ -22,7 +22,6 @@ import static com.google.common.collect.
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
import static java.util.Collections.singletonList;
-import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
@@ -510,12 +509,7 @@ public final class DocumentNodeStore
getRevisionComparator().add(headRevision, Revision.newRevision(0));
dispatcher = new ChangeDispatcher(getRoot());
- commitQueue = new CommitQueue() {
- @Override
- protected Revision newRevision() {
- return DocumentNodeStore.this.newRevision();
- }
- };
+ commitQueue = new CommitQueue(this);
String threadNamePostfix = "(" + clusterId + ")";
batchCommitQueue = new BatchCommitQueue(store, revisionComparator);
backgroundReadThread = new Thread(
@@ -617,19 +611,6 @@ public final class DocumentNodeStore
}
/**
- * Create a new revision.
- *
- * @return the revision
- */
- @Nonnull
- Revision newRevision() {
- if (simpleRevisionCounter != null) {
- return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
- }
- return Revision.newRevision(clusterId);
- }
-
- /**
* Creates a new commit. The caller must acknowledge the commit either with
* {@link #done(Commit, boolean, CommitInfo)} or {@link #canceled(Commit)},
* depending on the result of the commit.
@@ -700,6 +681,7 @@ public final class DocumentNodeStore
changes.modified(c.getModifiedPaths());
// update head revision
setHeadRevision(c.getRevision());
+ commitQueue.headRevisionChanged();
dispatcher.contentChanged(getRoot(), info);
}
});
@@ -1382,8 +1364,10 @@ public final class DocumentNodeStore
b.applyTo(getPendingModifications(), commit.getRevision());
getBranches().remove(b);
} else {
- throw new CommitFailedException(MERGE, 2,
- "Conflicting concurrent change. Update operation failed: " + op);
+ NodeDocument root = Utils.getRootDocument(store);
+ Revision conflictRev = root.getMostRecentConflictFor(b.getCommits(), this);
+ String msg = "Conflicting concurrent change. Update operation failed: " + op;
+ throw new ConflictException(msg, conflictRev).asCommitFailedException();
}
} else {
// no commits in this branch -> do nothing
@@ -1503,6 +1487,24 @@ public final class DocumentNodeStore
}
}
+ /**
+ * Suspends until the given revision is visible from the current
+ * headRevision or the given revision is canceled from the commit queue.
+ *
+ * The thread will *not* be suspended if the given revision is from a
+ * foreign cluster node and async delay is set to zero.
+ *
+ * @param r the revision to become visible.
+ */
+ void suspendUntil(@Nonnull Revision r) {
+ // do not suspend if revision is from another cluster node
+ // and background read is disabled
+ if (r.getClusterId() != getClusterId() && getAsyncDelay() == 0) {
+ return;
+ }
+ commitQueue.suspendUntil(r);
+ }
+
//------------------------< Observable >------------------------------------
@Override
@@ -1647,6 +1649,14 @@ public final class DocumentNodeStore
return headRevision;
}
+ @Nonnull
+ public Revision newRevision() {
+ if (simpleRevisionCounter != null) {
+ return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
+ }
+ return Revision.newRevision(clusterId);
+ }
+
//----------------------< background operations >---------------------------
/** Used for testing only */
@@ -1904,6 +1914,7 @@ public final class DocumentNodeStore
// the new head revision is after other revisions
setHeadRevision(newRevision());
if (dispatchChange) {
+ commitQueue.headRevisionChanged();
time = clock.getTime();
if (externalSort != null) {
// then there were external changes and reading them
@@ -1911,7 +1922,7 @@ public final class DocumentNodeStore
try {
JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
} catch (Exception e1) {
- LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+ LOG.error("backgroundRead: Exception while processing external changes from journal: {}", e1, e1);
}
}
stats.populateDiffCache = clock.getTime() - time;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java Tue Sep 8 08:03:25 2015
@@ -113,6 +113,12 @@ class DocumentNodeStoreBranch implements
throws CommitFailedException {
try {
return merge0(hook, info, false);
+ } catch (FailedWithConflictException e) {
+ // suspend until conflicting revision is visible
+ LOG.debug("Suspending until {} is visible. Current head {}.",
+ e.getConflictRevision(), store.getHeadRevision());
+ store.suspendUntil(e.getConflictRevision());
+ LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
} catch (CommitFailedException e) {
if (!e.isOfType(MERGE)) {
throw e;
@@ -166,6 +172,9 @@ class DocumentNodeStoreBranch implements
try {
return branchState.merge(checkNotNull(hook),
checkNotNull(info), exclusive);
+ } catch (FailedWithConflictException e) {
+ // let caller decide what to do with conflicting revision
+ throw e;
} catch (CommitFailedException e) {
LOG.trace("Merge Error", e);
ex = e;
@@ -481,6 +490,8 @@ class DocumentNodeStoreBranch implements
NodeState newHead = DocumentNodeStoreBranch.this.persist(toCommit, base, info);
branchState = new Merged(base);
return newHead;
+ } catch (ConflictException e) {
+ throw e.asCommitFailedException();
} catch(DocumentStoreException e) {
throw new CommitFailedException(MERGE, 1,
"Failed to merge changes to the underlying store", e);
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java Tue Sep 8 08:03:25 2015
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link CommitFailedException} with a conflict revision.
+ */
+class FailedWithConflictException extends CommitFailedException {
+
+ private static final long serialVersionUID = 2716279884065949789L;
+
+ private final Revision conflictRevision;
+
+ FailedWithConflictException(@Nonnull Revision conflictRevision,
+ @Nonnull String message,
+ @Nonnull Throwable cause) {
+ super(OAK, MERGE, 4, checkNotNull(message), checkNotNull(cause));
+ this.conflictRevision = checkNotNull(conflictRevision);
+ }
+
+ /**
+ * @return the revision of another commit which caused a conflict.
+ */
+ @Nonnull
+ Revision getConflictRevision() {
+ return conflictRevision;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue Sep 8 08:03:25 2015
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -654,6 +655,45 @@ public final class NodeDocument extends
}
/**
+ * Returns the most recent conflict on the given {@code branchCommits} if
+ * there are any. The returned revision is the commit, which created the
+ * collision marker for one of the {@code branchCommits}.
+ *
+ * @param branchCommits the branch commits to check.
+ * @param context a revision context.
+ * @return the conflict revision or {@code null} if there aren't any or
+ * the collision marker does not have a revision value.
+ */
+ @CheckForNull
+ Revision getMostRecentConflictFor(@Nonnull Iterable<Revision> branchCommits,
+ @Nonnull RevisionContext context) {
+ checkNotNull(branchCommits);
+ checkNotNull(context);
+
+ Comparator<Revision> comparator = context.getRevisionComparator();
+ Revision conflict = null;
+
+ Map<Revision, String> collisions = getLocalMap(COLLISIONS);
+ for (Revision r : branchCommits) {
+ String value = collisions.get(r.asTrunkRevision());
+ if (value == null) {
+ continue;
+ }
+ Revision c;
+ try {
+ c = Revision.fromString(value);
+ } catch (IllegalArgumentException e) {
+ // backward compatibility: collision marker with value 'true'
+ continue;
+ }
+ if (conflict == null || comparator.compare(conflict, c) < 0) {
+ conflict = c;
+ }
+ }
+ return conflict;
+ }
+
+ /**
* Returns the commit root path for the given <code>revision</code> or
* <code>null</code> if this document does not have a commit root entry for
* the given <code>revision</code>.
@@ -1480,10 +1520,18 @@ public final class NodeDocument extends
checkNotNull(op).removeMapEntry(REVISIONS, checkNotNull(revision));
}
+ /**
+ * Add a collision marker for the given {@code revision}.
+ *
+ * @param op the update operation.
+ * @param revision the commit for which a collision was detected.
+ * @param other the revision for the commit, which detected the collision.
+ */
public static void addCollision(@Nonnull UpdateOp op,
- @Nonnull Revision revision) {
+ @Nonnull Revision revision,
+ @Nonnull Revision other) {
checkNotNull(op).setMapEntry(COLLISIONS, checkNotNull(revision),
- String.valueOf(true));
+ other.toString());
}
public static void removeCollision(@Nonnull UpdateOp op,
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java Tue Sep 8 08:03:25 2015
@@ -51,4 +51,10 @@ public interface RevisionContext {
*/
@Nonnull
Revision getHeadRevision();
+
+ /**
+ * @return a new revision for the local document node store instance.
+ */
+ @Nonnull
+ Revision newRevision();
}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java Tue Sep 8 08:03:25 2015
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.List;
+
+import javax.annotation.CheckForNull;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.spi.commit.CommitInfo.EMPTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class ClusterConflictTest {
+
+ @Rule
+ public final DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterConflictTest.class);
+
+ private DocumentNodeStore ns1;
+ private DocumentNodeStore ns2;
+
+ @Before
+ public void setUp() {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ ns1 = newDocumentNodeStore(store);
+ ns2 = newDocumentNodeStore(store);
+ }
+
+ private DocumentNodeStore newDocumentNodeStore(DocumentStore store) {
+ // use high async delay and run background ops manually
+ // asyncDelay set to zero prevents commits from suspending
+ return builderProvider.newBuilder()
+ .setAsyncDelay(60000)
+ .setDocumentStore(store)
+ .setLeaseCheck(false) // disabled for debugging purposes
+ .getNodeStore();
+ }
+
+ @Test
+ public void suspendUntilVisible() throws Exception {
+ suspendUntilVisible(false);
+ }
+
+ @Test
+ public void suspendUntilVisibleWithBranch() throws Exception {
+ suspendUntilVisible(true);
+ }
+
+ private void suspendUntilVisible(boolean withBranch) throws Exception {
+ NodeBuilder b1 = ns1.getRoot().builder();
+ b1.child("counter").setProperty("value", 0);
+ merge(ns1, b1);
+ ns1.runBackgroundOperations();
+ ns2.runBackgroundOperations();
+
+ b1 = ns1.getRoot().builder();
+ b1.child("foo");
+ ns1.merge(b1, new TestHook(), EMPTY);
+
+ final List<Exception> exceptions = Lists.newArrayList();
+ final NodeBuilder b2 = ns2.getRoot().builder();
+ b2.child("bar");
+ if (withBranch) {
+ purge(b2);
+ }
+ b2.child("baz");
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("initiating merge");
+ ns2.merge(b2, new TestHook(), EMPTY);
+ LOG.info("merge succeeded");
+ } catch (CommitFailedException e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ t.start();
+
+ // wait until t is suspended
+ for (int i = 0; i < 100; i++) {
+ if (ns2.commitQueue.numSuspendedThreads() > 0) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ assertEquals(1, ns2.commitQueue.numSuspendedThreads());
+ LOG.info("commit suspended");
+
+ ns1.runBackgroundOperations();
+ LOG.info("ran background ops on ns1");
+ ns2.runBackgroundOperations();
+ LOG.info("ran background ops on ns2");
+ assertEquals(0, ns2.commitQueue.numSuspendedThreads());
+
+ t.join(3000);
+ assertFalse("Commit did not succeed within 3 seconds", t.isAlive());
+
+ for (Exception e : exceptions) {
+ throw e;
+ }
+ }
+
+ private static class TestHook extends EditorHook {
+
+ TestHook() {
+ super(new EditorProvider() {
+ @CheckForNull
+ @Override
+ public Editor getRootEditor(NodeState before,
+ NodeState after,
+ NodeBuilder builder,
+ CommitInfo info)
+ throws CommitFailedException {
+ return new TestEditor(builder.child("counter"));
+ }
+ });
+ }
+ }
+
+ private static class TestEditor extends DefaultEditor {
+
+ private NodeBuilder counter;
+
+ TestEditor(NodeBuilder counter) {
+ this.counter = counter;
+ }
+
+ @Override
+ public Editor childNodeAdded(String name, NodeState after)
+ throws CommitFailedException {
+ counter.setProperty("value", counter.getProperty("value").getValue(Type.LONG) + 1);
+ return super.childNodeAdded(name, after);
+ }
+ }
+
+ private static NodeState merge(NodeStore store, NodeBuilder root)
+ throws CommitFailedException {
+ return store.merge(root, EmptyHook.INSTANCE, EMPTY);
+ }
+
+ private static void purge(NodeBuilder builder) {
+ ((DocumentRootBuilder) builder).purge();
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java Tue Sep 8 08:03:25 2015
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -37,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.synchronizedList;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
@@ -120,12 +122,7 @@ public class CommitQueueTest {
@Test
public void concurrentCommits2() throws Exception {
- final CommitQueue queue = new CommitQueue() {
- @Override
- protected Revision newRevision() {
- return Revision.newRevision(1);
- }
- };
+ final CommitQueue queue = new CommitQueue(DummyRevisionContext.INSTANCE);
final CommitQueue.Callback c = new CommitQueue.Callback() {
private Revision before = Revision.newRevision(1);
@@ -208,6 +205,47 @@ public class CommitQueueTest {
assertNoExceptions();
}
+ @Test
+ public void suspendUntil() throws Exception {
+ final AtomicReference<Revision> headRevision = new AtomicReference<Revision>();
+ RevisionContext context = new DummyRevisionContext() {
+ @Nonnull
+ @Override
+ public Revision getHeadRevision() {
+ return headRevision.get();
+ }
+ };
+ headRevision.set(context.newRevision());
+ final CommitQueue queue = new CommitQueue(context);
+
+ final Revision r = context.newRevision();
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ queue.suspendUntil(r);
+ }
+ });
+ t.start();
+
+ // wait until t is suspended
+ for (int i = 0; i < 100; i++) {
+ if (queue.numSuspendedThreads() > 0) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ assertEquals(1, queue.numSuspendedThreads());
+
+ queue.headRevisionChanged();
+ // must still be suspended
+ assertEquals(1, queue.numSuspendedThreads());
+
+ headRevision.set(r);
+ queue.headRevisionChanged();
+ // must not be suspended anymore
+ assertEquals(0, queue.numSuspendedThreads());
+ }
+
private void assertNoExceptions() throws Exception {
if (!exceptions.isEmpty()) {
throw exceptions.get(0);
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java?rev=1701741&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java Tue Sep 8 08:03:25 2015
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class ConflictExceptionTest {
+
+ @Test
+ public void type() {
+ ConflictException e = new ConflictException("conflict", null);
+ CommitFailedException cfe = e.asCommitFailedException();
+ assertEquals(CommitFailedException.MERGE, cfe.getType());
+ }
+
+ @Test
+ public void cause() {
+ ConflictException e = new ConflictException("conflict", null);
+ CommitFailedException cfe = e.asCommitFailedException();
+ assertSame(e, cfe.getCause());
+ }
+
+ @Test
+ public void asCommitFailedException() {
+ Revision r = Revision.newRevision(1);
+ ConflictException e = new ConflictException("conflict", r);
+ CommitFailedException cfe = e.asCommitFailedException();
+ assertTrue(cfe instanceof FailedWithConflictException);
+ FailedWithConflictException fwce = (FailedWithConflictException) cfe;
+ assertEquals(CommitFailedException.MERGE, fwce.getType());
+ assertEquals(r, fwce.getConflictRevision());
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java Tue Sep 8 08:03:25 2015
@@ -886,6 +886,12 @@ public class DocumentSplitTest extends B
}
return rc.getHeadRevision();
}
+
+ @Nonnull
+ @Override
+ public Revision newRevision() {
+ return rc.newRevision();
+ }
}
private static NodeState merge(NodeStore store, NodeBuilder root)
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java Tue Sep 8 08:03:25 2015
@@ -55,4 +55,10 @@ public class DummyRevisionContext implem
public Revision getHeadRevision() {
return Revision.newRevision(1);
}
+
+ @Nonnull
+ @Override
+ public Revision newRevision() {
+ return Revision.newRevision(1);
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java Tue Sep 8 08:03:25 2015
@@ -39,11 +39,13 @@ import org.apache.jackrabbit.oak.spi.sta
import org.junit.Test;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.revisionAreAmbiguous;
import static org.apache.jackrabbit.oak.plugins.document.Revision.RevisionComparator;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -61,7 +63,7 @@ public class NodeDocumentTest {
for (int i = 0; i < NodeDocument.NUM_REVS_THRESHOLD + 1; i++) {
Revision r = Revision.newRevision(1);
NodeDocument.setRevision(op, r, "c");
- NodeDocument.addCollision(op, r);
+ NodeDocument.addCollision(op, r, Revision.newRevision(1));
}
UpdateUtils.applyChanges(doc, op, StableRevisionComparator.INSTANCE);
Revision head = DummyRevisionContext.INSTANCE.getHeadRevision();
@@ -106,6 +108,53 @@ public class NodeDocumentTest {
}
@Test
+ public void getMostRecentConflictFor() {
+ RevisionContext context = DummyRevisionContext.INSTANCE;
+ MemoryDocumentStore docStore = new MemoryDocumentStore();
+ String id = Utils.getPathFromId("/");
+ NodeDocument doc = new NodeDocument(docStore);
+ doc.put(Document.ID, id);
+
+ Iterable<Revision> branchCommits = Collections.emptyList();
+ Revision conflict = doc.getMostRecentConflictFor(branchCommits, context);
+ assertNull(conflict);
+
+ // add some collisions
+ UpdateOp op = new UpdateOp(id, false);
+ Revision r0 = Revision.newRevision(1);
+ Revision r1 = Revision.newRevision(1);
+ Revision c1 = Revision.newRevision(1);
+ Revision r2 = Revision.newRevision(1);
+ Revision c2 = Revision.newRevision(1);
+ // backward compatibility test
+ op.setMapEntry(COLLISIONS, r0, String.valueOf(true));
+ // regular collision entries
+ NodeDocument.addCollision(op, r1, c1);
+ NodeDocument.addCollision(op, r2, c2);
+ UpdateUtils.applyChanges(doc, op, StableRevisionComparator.INSTANCE);
+
+ branchCommits = Collections.singleton(r0);
+ conflict = doc.getMostRecentConflictFor(branchCommits, context);
+ assertNull(conflict);
+
+ branchCommits = Collections.singleton(r1.asBranchRevision());
+ conflict = doc.getMostRecentConflictFor(branchCommits, context);
+ assertEquals(c1, conflict);
+
+ branchCommits = Collections.singleton(r2.asBranchRevision());
+ conflict = doc.getMostRecentConflictFor(branchCommits, context);
+ assertEquals(c2, conflict);
+
+ branchCommits = Lists.newArrayList(r1.asBranchRevision(), r2.asBranchRevision());
+ conflict = doc.getMostRecentConflictFor(branchCommits, context);
+ assertEquals(c2, conflict);
+
+ branchCommits = Lists.newArrayList(r2.asBranchRevision(), r1.asBranchRevision());
+ conflict = doc.getMostRecentConflictFor(branchCommits, context);
+ assertEquals(c2, conflict);
+ }
+
+ @Test
public void getAllChanges() throws Exception {
final int NUM_CHANGES = 200;
DocumentNodeStore ns = createTestStore(NUM_CHANGES);
Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java?rev=1701741&r1=1701740&r2=1701741&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java Tue Sep 8 08:03:25 2015
@@ -26,10 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +59,6 @@ public class ConcurrentAddNodesClusterIT
private static final int LOOP_COUNT = 10;
private static final int WORKER_COUNT = 20;
private static final String PROP_NAME = "testcount";
- private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
private List<Repository> repos = new ArrayList<Repository>();
private List<DocumentMK> mks = new ArrayList<DocumentMK>();
@@ -81,12 +77,15 @@ public class ConcurrentAddNodesClusterIT
@After
public void after() throws Exception {
+ workers.clear();
for (Repository repo : repos) {
dispose(repo);
}
+ repos.clear();
for (DocumentMK mk : mks) {
mk.dispose();
}
+ mks.clear();
dropDB();
}
@@ -211,7 +210,6 @@ public class ConcurrentAddNodesClusterIT
for (int i = 0; i < 2; i++) {
DocumentMK mk = new DocumentMK.Builder()
.setMongoDB(createConnection().getDB())
- .setAsyncDelay(0)
.setClusterId(i + 1).open();
mks.add(mk);
}
@@ -226,13 +224,11 @@ public class ConcurrentAddNodesClusterIT
Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray()));
ensureIndex(s1.getRootNode(), PROP_NAME);
- syncMKs(1);
ensureIndex(s2.getRootNode(), PROP_NAME);
Map<String, Exception> exceptions = Collections.synchronizedMap(
new HashMap<String, Exception>());
createNodes(s1, "testroot-1", 1, 1, exceptions);
- syncMKs(1);
createNodes(s2, "testroot-2", 1, 1, exceptions);
for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
@@ -267,9 +263,9 @@ public class ConcurrentAddNodesClusterIT
Session s3 = r3.login(new SimpleCredentials("admin", "admin".toCharArray()));
ensureIndex(s1.getRootNode(), PROP_NAME);
- syncMKs(1);
- ensureIndex(s2.getRootNode(), PROP_NAME);
- ensureIndex(s3.getRootNode(), PROP_NAME);
+ runBackgroundOps(mk1);
+ runBackgroundOps(mk2);
+ runBackgroundOps(mk3);
// begin test
@@ -367,18 +363,6 @@ public class ConcurrentAddNodesClusterIT
s2.logout();
}
- private void syncMKs(int delay) {
- EXECUTOR.schedule(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- for (DocumentMK mk : mks) {
- runBackgroundOps(mk);
- }
- return null;
- }
- }, delay, TimeUnit.SECONDS);
- }
-
private static MongoConnection createConnection() throws Exception {
return OakMongoNSRepositoryStub.createConnection(
ConcurrentAddNodesClusterIT.class.getSimpleName());