You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/09/29 16:56:32 UTC

svn commit: r1705871 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/util/ test/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/ja...

Author: mreutegg
Date: Tue Sep 29 14:56:32 2015
New Revision: 1705871

URL: http://svn.apache.org/viewvc?rev=1705871&view=rev
Log:
OAK-3388: Inconsistent read in cluster with clock differences

Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CollisionHandler.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java Tue Sep 29 14:56:32 2015
@@ -59,9 +59,10 @@ public class Commit {
     private final DocumentNodeStoreBranch branch;
     private final Revision baseRevision;
     private final Revision revision;
-    private HashMap<String, UpdateOp> operations = new LinkedHashMap<String, UpdateOp>();
-    private JsopWriter diff = new JsopStream();
-    private Set<Revision> collisions = new LinkedHashSet<Revision>();
+    private final HashMap<String, UpdateOp> operations = new LinkedHashMap<String, UpdateOp>();
+    private final JsopWriter diff = new JsopStream();
+    private final Set<Revision> collisions = new LinkedHashSet<Revision>();
+    private Branch b;
 
     /**
      * List of all node paths which have been modified in this commit. In addition to the nodes
@@ -511,13 +512,12 @@ public class Commit {
         if (baseRevision != null) {
             Revision newestRev = null;
             if (before != null) {
-                newestRev = before.getNewestRevision(nodeStore, revision,
-                        new CollisionHandler() {
-                            @Override
-                            void concurrentModification(Revision other) {
-                                collisions.add(other);
-                            }
-                        });
+                Revision base = baseRevision;
+                if (nodeStore.isDisableBranches()) {
+                    base = base.asTrunkRevision();
+                }
+                newestRev = before.getNewestRevision(
+                        nodeStore, base, revision, getBranch(), collisions);
             }
             String conflictMessage = null;
             Revision conflictRevision = newestRev;
@@ -605,6 +605,20 @@ public class Commit {
     }
 
     /**
+     * @return the branch if this is a branch commit, otherwise {@code null}.
+     */
+    @CheckForNull
+    private Branch getBranch() {
+        if (baseRevision == null || !baseRevision.isBranch()) {
+            return null;
+        }
+        if (b == null) {
+            b = nodeStore.getBranches().getBranch(revision);
+        }
+        return b;
+    }
+
+    /**
      * Apply the changes to the DocumentNodeStore (to update the cache).
      *
      * @param before the revision right before this commit.

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Sep 29 14:56:32 2015
@@ -41,6 +41,7 @@ import java.io.InputStream;
 import java.lang.ref.WeakReference;
 import java.text.SimpleDateFormat;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -1839,6 +1840,29 @@ public final class DocumentNodeStore
             return stats;
         }
         Map<Integer, Revision> lastRevMap = doc.getLastRev();
+        try {
+            long externalTime = Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId);
+            long localTime = clock.getTime();
+            if (localTime < externalTime) {
+                LOG.warn("Detected clock differences. Local time is '{}', " +
+                                "while most recent external time is '{}'. " +
+                                "Current _lastRev entries: {}",
+                        new Date(localTime), new Date(externalTime), lastRevMap.values());
+                double delay = ((double) externalTime - localTime) / 1000d;
+                String msg = String.format("Background read will be delayed by %.1f seconds. " +
+                        "Please check system time on cluster nodes.", delay);
+                LOG.warn(msg);
+                clock.waitUntil(externalTime + 1);
+            } else if (localTime == externalTime) {
+                // make sure local time is past external time
+                // but only log at debug
+                LOG.debug("Local and external time are equal. Waiting until local" +
+                        "time is more recent than external reported time.");
+                clock.waitUntil(externalTime + 1);
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Background read interrupted", e);
+        }
 
         Revision.RevisionComparator revisionComparator = getRevisionComparator();
         // the (old) head occurred first

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java Tue Sep 29 14:56:32 2015
@@ -113,12 +113,6 @@ class DocumentNodeStoreBranch implements
             throws CommitFailedException {
         try {
             return merge0(hook, info, false);
-        } catch (FailedWithConflictException e) {
-            // suspend until conflicting revision is visible
-            LOG.debug("Suspending until {} is visible. Current head {}.",
-                    e.getConflictRevision(), store.getHeadRevision());
-            store.suspendUntil(e.getConflictRevision());
-            LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
         } catch (CommitFailedException e) {
             if (!e.isOfType(MERGE)) {
                 throw e;
@@ -155,6 +149,7 @@ class DocumentNodeStoreBranch implements
                              boolean exclusive)
             throws CommitFailedException {
         CommitFailedException ex = null;
+        Revision conflictRevision = null;
         long time = System.currentTimeMillis();
         int numRetries = 0;
         for (long backoff = MIN_BACKOFF; backoff <= maximumBackoff; backoff *= 2) {
@@ -162,7 +157,19 @@ class DocumentNodeStoreBranch implements
                 try {
                     numRetries++;
                     final long start = perfLogger.start();
-                    Thread.sleep(backoff + RANDOM.nextInt((int) Math.min(backoff, Integer.MAX_VALUE)));
+                    // suspend until conflict revision is visible
+                    // or as a fallback sleep for a while
+                    if (conflictRevision != null) {
+                        // suspend until conflicting revision is visible
+                        LOG.debug("Suspending until {} is visible. Current head {}.",
+                                conflictRevision, store.getHeadRevision());
+                        store.suspendUntil(conflictRevision);
+                        LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
+                        // reset conflict revision
+                        conflictRevision = null;
+                    } else {
+                        Thread.sleep(backoff + RANDOM.nextInt((int) Math.min(backoff, Integer.MAX_VALUE)));
+                    }
                     perfLogger.end(start, 1, "Merge - Retry attempt [{}]", numRetries);
                 } catch (InterruptedException e) {
                     throw new CommitFailedException(
@@ -173,18 +180,19 @@ class DocumentNodeStoreBranch implements
                 return branchState.merge(checkNotNull(hook),
                         checkNotNull(info), exclusive);
             } catch (FailedWithConflictException e) {
-                // let caller decide what to do with conflicting revision
-                throw e;
+                ex = e;
+                conflictRevision = e.getConflictRevision();
             } catch (CommitFailedException e) {
-                LOG.trace("Merge Error", e);
                 ex = e;
-                // only retry on merge failures. these may be caused by
-                // changes introduce by a commit hook and may be resolved
-                // by a rebase and running the hook again
-                if (!e.isOfType(MERGE)) {
-                    throw e;
-                }
             }
+            LOG.trace("Merge Error", ex);
+            // only retry on merge failures. these may be caused by
+            // changes introduce by a commit hook and may be resolved
+            // by a rebase and running the hook again
+            if (!ex.isOfType(MERGE)) {
+                throw ex;
+            }
+
         }
         // if we get here retrying failed
         time = System.currentTimeMillis() - time;
@@ -588,6 +596,8 @@ class DocumentNodeStoreBranch implements
                 return newRoot;
             } catch (CommitFailedException e) {
                 throw e;
+            } catch (ConflictException e) {
+                throw e.asCommitFailedException();
             } catch (Exception e) {
                 throw new CommitFailedException(MERGE, 1,
                         "Failed to merge changes to the underlying store", e);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue Sep 29 14:56:32 2015
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Queue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -55,6 +56,7 @@ import com.google.common.collect.Iterabl
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.transform;
@@ -708,53 +710,150 @@ public final class NodeDocument extends
     }
 
     /**
-     * Get the revision of the latest change made to this node.
+     * Get the revision of the latest change made to this node. At the same
+     * time this method collects all collisions that happened for the given
+     * {@code changeRev}. The reported latest change takes branches into
+     * account. This means, if {@code changeRev} is on a branch, the latest
+     * change is either a change that was done by a preceding branch commit or
+     * a change that happened before the base of the branch. Changes done after
+     * the branch base on trunk are not considered in this case. For a trunk
+     * commit the latest change is reported similarly. In this case, unmerged
+     * branch commits are not considered as latest change. Only commits to trunk
+     * are considered.
+     *
+     * Collisions include the following cases:
+     * <ul>
+     *     <li>The other change is not yet committed</li>
+     *     <li>The other change is a branch commit and not yet merged</li>
+     *     <li>The {@code changeRev} is a branch commit and the other change
+     *       happened after the base revision of the branch</li>
+     *     <li>The other change is from another cluster node and not yet
+     *       visible</li>
+     * </ul>
      *
-     * @param context the revision context
-     * @param changeRev the revision of the current change
-     * @param handler the conflict handler, which is called for concurrent changes
-     *                preceding <code>changeRev</code>.
-     * @return the revision, or null if deleted
+     * @param context the revision context.
+     * @param baseRev the base revision of the current change.
+     * @param changeRev the revision of the current change.
+     * @param branch the branch associated with the current change or
+     *              {@code null} if {@code changeRev} is not a branch commit.
+     * @param collisions changes that happened after {@code baseRev}.
      */
     @CheckForNull
-    public Revision getNewestRevision(final RevisionContext context,
-                                      final Revision changeRev,
-                                      final CollisionHandler handler) {
-        final Map<Revision, String> validRevisions = Maps.newHashMap();
-        Predicate<Revision> predicate = new Predicate<Revision>() {
-            @Override
-            public boolean apply(Revision input) {
-                if (input.equals(changeRev)) {
-                    return false;
-                }
-                if (isValidRevision(context, input, null, changeRev, validRevisions)) {
-                    return true;
+    Revision getNewestRevision(final RevisionContext context,
+                               final Revision baseRev,
+                               final Revision changeRev,
+                               final Branch branch,
+                               final Set<Revision> collisions) {
+        checkArgument(!baseRev.isBranch() || branch != null,
+                "Branch must be non-null if baseRev is a branch revision");
+        Revision head = context.getHeadRevision();
+        Revision lower = branch != null ? branch.getBase() : baseRev;
+        // the clusterIds to check when walking the changes
+        Set<Integer> clusterIds = Collections.emptySet();
+        if (!getPreviousRanges().isEmpty()) {
+            clusterIds = Sets.newHashSet();
+            for (Revision prevRev : getPreviousRanges().keySet()) {
+                if (!isRevisionNewer(context, lower, prevRev)) {
+                    clusterIds.add(prevRev.getClusterId());
                 }
-                handler.concurrentModification(input);
-                return false;
             }
-        };
-
-        Revision newestRev = null;
-        // check local commits first
-        SortedMap<Revision, String> revisions = getLocalRevisions();
-        SortedMap<Revision, String> commitRoots = getLocalCommitRoot();
-        Iterator<Revision> it = filter(Iterables.mergeSorted(
-                ImmutableList.of(revisions.keySet(), commitRoots.keySet()),
-                revisions.comparator()), predicate).iterator();
-        if (it.hasNext()) {
-            newestRev = it.next();
+        }
+        // if we don't have clusterIds, we can use the local changes only
+        boolean fullScan = true;
+        Iterable<Revision> changes;
+        if (clusterIds.isEmpty()) {
+            // baseRev is newer than all previous documents
+            changes = Iterables.mergeSorted(
+                    ImmutableList.of(
+                            getLocalRevisions().keySet(),
+                            getLocalCommitRoot().keySet()),
+                    getLocalRevisions().comparator());
         } else {
-            // check full history (only needed in rare cases)
+            // include previous documents as well (only needed in rare cases)
+            fullScan = false;
+            changes = getAllChanges();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("getNewestRevision() with changeRev {} on {}, " +
                                 "_revisions {}, _commitRoot {}",
                         changeRev, getId(), getLocalRevisions(), getLocalCommitRoot());
             }
-            it = filter(getAllChanges(), predicate).iterator();
-            if (it.hasNext()) {
-                newestRev = it.next();
+        }
+        Map<Integer, Revision> newestRevs = Maps.newHashMap();
+        Map<Revision, String> validRevisions = Maps.newHashMap();
+        for (Revision r : changes) {
+            if (r.equals(changeRev)) {
+                continue;
+            }
+            if (!fullScan) {
+                // check if we can stop going through changes
+                if (clusterIds.contains(r.getClusterId())) {
+                    if (isRevisionNewer(context, lower, r)) {
+                        clusterIds.remove(r.getClusterId());
+                        if (clusterIds.isEmpty()) {
+                            // all remaining revisions are older than
+                            // the lower bound
+                            break;
+                        }
+                    }
+                }
+            }
+            if (newestRevs.containsKey(r.getClusterId())) {
+                // we already found the newest revision for this clusterId
+                // from a baseRev point of view
+                // we still need to find collisions up to the base
+                // of the branch if this is for a commit on a branch
+                if (branch != null && !branch.containsCommit(r)) {
+                    // change does not belong to the branch
+                    if (isRevisionNewer(context, r, branch.getBase())) {
+                        // and happened after the base of the branch
+                        collisions.add(r);
+                    }
+                }
+            } else {
+                // we don't yet have the newest committed change
+                // for this clusterId
+                // check if change is visible from baseRev
+                if (isValidRevision(context, r, null, baseRev, validRevisions)) {
+                    // consider for newestRev
+                    newestRevs.put(r.getClusterId(), r);
+                } else {
+                    // not valid means:
+                    // 1) 'r' is not committed -> collision
+                    // 2) 'r' is on a branch, but not the same as
+                    //    changeRev -> collisions
+                    // 3) changeRev is on a branch and 'r' is newer than
+                    //    the base of the branch -> collision
+                    // 4) 'r' is committed but not yet visible to current
+                    //    cluster node -> collisions
+                    // 5) changeRev is not on a branch, 'r' is committed and
+                    //    newer than baseRev -> newestRev
+
+                    NodeDocument commitRoot = getCommitRoot(r);
+                    Revision commitRevision = null;
+                    if (commitRoot != null) {
+                        commitRevision = commitRoot.getCommitRevision(r);
+                    }
+                    if (commitRevision != null // committed but not yet visible
+                            && isRevisionNewer(context, commitRevision, head)) {
+                        // case 4)
+                        collisions.add(r);
+                    } else if (commitRevision != null // committed
+                            && branch == null         // changeRev not on branch
+                            && isRevisionNewer(context, r, baseRev)) {
+                        // case 5)
+                        newestRevs.put(r.getClusterId(), r);
+                    } else {
+                        // remaining cases 1), 2) and 3)
+                        collisions.add(r);
+                    }
+                }
             }
+
+        }
+        // select the newest committed change
+        Revision newestRev = null;
+        for (Revision r : newestRevs.values()) {
+            newestRev = Utils.max(newestRev, r, context.getRevisionComparator());
         }
 
         if (newestRev == null) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java Tue Sep 29 14:56:32 2015
@@ -607,7 +607,7 @@ public class Revision {
             if (comp != 0) {
                 return comp;
             }
-            return Integer.signum(o1.getClusterId() - o2.getClusterId());
+            return o1.compareTo(o2);
         }
 
         /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Tue Sep 29 14:56:32 2015
@@ -501,12 +501,30 @@ public class Utils {
      */
     @CheckForNull
     public static Revision max(@Nullable Revision a, @Nullable Revision b) {
+        return max(a, b, StableRevisionComparator.INSTANCE);
+    }
+
+    /**
+     * Returns the revision which is considered more recent or {@code null} if
+     * both revisions are {@code null}. The implementation will return the first
+     * revision if both are considered equal. The comparison is done using the
+     * provided comparator.
+     *
+     * @param a the first revision (or {@code null}).
+     * @param b the second revision (or {@code null}).
+     * @param c the comparator.
+     * @return the revision considered more recent.
+     */
+    @CheckForNull
+    public static Revision max(@Nullable Revision a,
+                               @Nullable Revision b,
+                               @Nonnull Comparator<Revision> c) {
         if (a == null) {
             return b;
         } else if (b == null) {
             return a;
         }
-        return StableRevisionComparator.INSTANCE.compare(a, b) >= 0 ? a : b;
+        return c.compare(a, b) >= 0 ? a : b;
     }
 
     /**
@@ -640,4 +658,26 @@ public class Utils {
             }
         });
     }
+
+    /**
+     * Returns the highest timestamp of all the passed external revisions.
+     * A revision is considered external if the clusterId is different from the
+     * passed {@code localClusterId}.
+     *
+     * @param revisions the revisions to consider.
+     * @param localClusterId the id of the local cluster node.
+     * @return the highest timestamp or {@link Long#MIN_VALUE} if none of the
+     *          revisions is external.
+     */
+    public static long getMaxExternalTimestamp(Iterable<Revision> revisions,
+                                               int localClusterId) {
+        long maxTime = Long.MIN_VALUE;
+        for (Revision r : revisions) {
+            if (r.getClusterId() == localClusterId) {
+                continue;
+            }
+            maxTime = Math.max(maxTime, r.getTimestamp());
+        }
+        return maxTime;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Tue Sep 29 14:56:32 2015
@@ -92,7 +92,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -1097,7 +1096,6 @@ public class DocumentNodeStoreTest {
     }
 
     // OAK-2929
-    @Ignore
     @Test
     public void conflictDetectionWithClockDifference() throws Exception {
         MemoryDocumentStore store = new MemoryDocumentStore();
@@ -1155,7 +1153,6 @@ public class DocumentNodeStoreTest {
     }
 
     // OAK-2929
-    @Ignore
     @Test
     public void parentWithUnseenChildrenMustNotBeDeleted() throws Exception {
         final MemoryDocumentStore docStore = new MemoryDocumentStore();
@@ -1318,12 +1315,7 @@ public class DocumentNodeStoreTest {
         //root would hold reference to store2 root state after initial repo initialization
         root = store2.getRoot();
 
-        //The hidden node itself should be creatable across cluster concurrently
-        builder = root.builder();
-        builder.child(":dynHidden");
-        merge(store2, builder);
-
-        //Children of hidden node should be creatable across cluster concurrently
+        //The hidden node and children should be creatable across cluster concurrently
         builder = root.builder();
         builder.child(":hidden").child("b");
         builder.child(":dynHidden").child("c");
@@ -1499,7 +1491,6 @@ public class DocumentNodeStoreTest {
     }
 
     // OAK-3388
-    @Ignore
     @Test
     public void clusterWithClockDifferences() throws Exception {
         MemoryDocumentStore store = new MemoryDocumentStore();
@@ -1551,7 +1542,6 @@ public class DocumentNodeStoreTest {
     }
 
     // OAK-3388
-    @Ignore
     @Test
     public void clusterWithClockDifferences2() throws Exception {
         MemoryDocumentStore store = new MemoryDocumentStore();

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java Tue Sep 29 14:56:32 2015
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
@@ -45,8 +46,10 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link NodeDocument}.
@@ -366,21 +369,135 @@ public class NodeDocumentTest {
         // simulate a change revision within the range of
         // the most recent previous document
         Iterable<Revision> changes = prev.getAllChanges();
-        Revision changeRev = new Revision(Iterables.getLast(changes).getTimestamp(), 1000, ns.getClusterId());
+        Revision baseRev = Iterables.getLast(changes);
+        Revision changeRev = new Revision(baseRev.getTimestamp(), 1000, ns.getClusterId());
         // reset calls to previous documents
         prevDocCalls.clear();
-        doc.getNewestRevision(ns, changeRev, new CollisionHandler() {
-            @Override
-            void concurrentModification(Revision other) {
-                // ignore
-            }
-        });
+        doc.getNewestRevision(ns, baseRev, changeRev, null, new HashSet<Revision>());
         // must not read all previous docs
         assertTrue("too many calls for previous documents: " + prevDocCalls,
                 prevDocCalls.size() <= 4);
 
         ns.dispose();
     }
+
+    @Test
+    public void getNewestRevision() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ns1 = createTestStore(store, 0);
+        DocumentNodeStore ns2 = createTestStore(store, 0);
+
+        NodeBuilder b1 = ns1.getRoot().builder();
+        b1.child("test");
+        merge(ns1, b1);
+        Revision created = ns1.getHeadRevision();
+
+        NodeDocument doc = store.find(NODES, Utils.getIdFromPath("/test"));
+        Set<Revision> collisions = Sets.newHashSet();
+        Revision newest = doc.getNewestRevision(ns1, ns1.getHeadRevision(),
+                ns1.newRevision(), null, collisions);
+        assertEquals(created, newest);
+        assertEquals(0, collisions.size());
+
+        // from ns2 POV newest must be null, because the node is not yet visible
+        newest = doc.getNewestRevision(ns2, ns2.getHeadRevision(),
+                ns2.newRevision(), null, collisions);
+        assertNull(newest);
+        assertEquals(1, collisions.size());
+        assertEquals(created, collisions.iterator().next());
+
+        ns1.runBackgroundOperations();
+        ns2.runBackgroundOperations();
+        collisions.clear();
+
+        // now ns2 sees /test
+        doc = store.find(NODES, Utils.getIdFromPath("/test"));
+        newest = doc.getNewestRevision(ns2, ns2.getHeadRevision(),
+                ns2.newRevision(), null, collisions);
+        assertEquals(created, newest);
+        assertEquals(0, collisions.size());
+
+        Revision uncommitted = ns1.newRevision();
+        UpdateOp op = new UpdateOp(Utils.getIdFromPath("/test"), false);
+        NodeDocument.setCommitRoot(op, uncommitted, 0);
+        op.setMapEntry("p", uncommitted, "v");
+        assertNotNull(store.findAndUpdate(NODES, op));
+
+        collisions.clear();
+        // ns1 must report uncommitted in collisions
+        doc = store.find(NODES, Utils.getIdFromPath("/test"));
+        newest = doc.getNewestRevision(ns1, ns1.getHeadRevision(),
+                ns1.newRevision(), null, collisions);
+        assertEquals(created, newest);
+        assertEquals(1, collisions.size());
+        assertEquals(uncommitted, collisions.iterator().next());
+
+        collisions.clear();
+        // ns2 must report uncommitted in collisions
+        newest = doc.getNewestRevision(ns2, ns2.getHeadRevision(),
+                ns2.newRevision(), null, collisions);
+        assertEquals(created, newest);
+        assertEquals(1, collisions.size());
+        assertEquals(uncommitted, collisions.iterator().next());
+
+        b1 = ns1.getRoot().builder();
+        b1.child("test").setProperty("q", "v");
+        merge(ns1, b1);
+        Revision committed = ns1.getHeadRevision();
+
+        collisions.clear();
+        // ns1 must now report committed revision as newest
+        // uncommitted is not considered a collision anymore
+        // because it is older than the base revision
+        doc = store.find(NODES, Utils.getIdFromPath("/test"));
+        newest = doc.getNewestRevision(ns1, created,
+                ns1.newRevision(), null, collisions);
+        assertEquals(committed, newest);
+        assertEquals(0, collisions.size());
+
+        // ns2 must report committed revision as collision because
+        // it is not yet visible. newest is when the node was created
+        newest = doc.getNewestRevision(ns2, ns2.getHeadRevision(),
+                ns2.newRevision(), null, collisions);
+        assertEquals(created, newest);
+        assertEquals(2, collisions.size());
+        assertTrue(collisions.contains(uncommitted));
+        assertTrue(collisions.contains(committed));
+
+
+        ns1.dispose();
+        ns2.dispose();
+    }
+
+    @Test
+    public void getNewestRevisionCheckArgument() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ns = createTestStore(store, 0);
+
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("test");
+        merge(ns, builder);
+
+        Set<Revision> collisions = Sets.newHashSet();
+        NodeDocument doc = store.find(NODES, Utils.getIdFromPath("/test"));
+        Revision branchBase = ns.getHeadRevision().asBranchRevision();
+        try {
+            doc.getNewestRevision(ns, branchBase, ns.newRevision(), null, collisions);
+            fail("Must fail with IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+        try {
+            Revision head = ns.getHeadRevision();
+            Branch b = ns.getBranches().create(head, ns.newRevision(), null);
+            doc.getNewestRevision(ns, head, ns.newRevision(), b, collisions);
+            fail("Must fail with IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        ns.dispose();
+    }
 
     private DocumentNodeStore createTestStore(int numChanges) throws Exception {
         return createTestStore(new MemoryDocumentStore(), numChanges);

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java Tue Sep 29 14:56:32 2015
@@ -129,6 +129,22 @@ public class RevisionTest {
     }
 
     @Test
+    public void compare2() {
+        RevisionComparator comp = new RevisionComparator(1);
+
+        Revision r2 = new Revision(7, 0, 2);
+        Revision r3 = new Revision(5, 0, 3);
+
+        Revision seenAt = new Revision(8, 0, 0);
+        comp.add(r2, seenAt);
+        comp.add(r3, seenAt);
+
+        // both revisions have same seenAt revision, must use
+        // revision timestamp for comparison
+        assertTrue(comp.compare(r2, r3) > 0);
+    }
+
+    @Test
     public void revisionComparatorSimple() {
         RevisionComparator comp = new RevisionComparator(0);
         Revision r1 = Revision.newRevision(0);
@@ -254,12 +270,12 @@ public class RevisionTest {
         comp.add(c1sync,  Revision.fromString("r2-0-0"));
         Revision c2sync = Revision.fromString("r4-1-2");
         comp.add(c2sync,  Revision.fromString("r2-1-0"));
-        Revision c3sync = Revision.fromString("r2-0-3");
+        Revision c3sync = Revision.fromString("r5-0-3");
         comp.add(c3sync, Revision.fromString("r2-1-0"));
 
         assertTrue(comp.compare(r1, r2) < 0);
         assertTrue(comp.compare(r2, c2sync) < 0);
-        // same seen-at revision, but clusterId 2 < 3
+        // same seen-at revision, but rev timestamp c2sync < c3sync
         assertTrue(comp.compare(c2sync, c3sync) < 0);
 
         // this means, c3sync must be after r1 and r2

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java?rev=1705871&r1=1705870&r2=1705871&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java Tue Sep 29 14:56:32 2015
@@ -16,7 +16,10 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.util;
 
+import java.util.List;
+
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -138,4 +141,42 @@ public class UtilsTest {
             store.dispose();
         }
     }
+
+    @Test
+    public void getMaxExternalRevisionTime() {
+        int localClusterId = 1;
+        List<Revision> revs = ImmutableList.of();
+        long revTime = Utils.getMaxExternalTimestamp(revs, localClusterId);
+        assertEquals(Long.MIN_VALUE, revTime);
+
+        revs = ImmutableList.of(Revision.fromString("r1-0-1"));
+        revTime = Utils.getMaxExternalTimestamp(revs, localClusterId);
+        assertEquals(Long.MIN_VALUE, revTime);
+
+        revs = ImmutableList.of(
+                Revision.fromString("r1-0-1"),
+                Revision.fromString("r2-0-2"));
+        revTime = Utils.getMaxExternalTimestamp(revs, localClusterId);
+        assertEquals(2, revTime);
+
+        revs = ImmutableList.of(
+                Revision.fromString("r3-0-1"),
+                Revision.fromString("r2-0-2"));
+        revTime = Utils.getMaxExternalTimestamp(revs, localClusterId);
+        assertEquals(2, revTime);
+
+        revs = ImmutableList.of(
+                Revision.fromString("r1-0-1"),
+                Revision.fromString("r2-0-2"),
+                Revision.fromString("r2-0-3"));
+        revTime = Utils.getMaxExternalTimestamp(revs, localClusterId);
+        assertEquals(2, revTime);
+
+        revs = ImmutableList.of(
+                Revision.fromString("r1-0-1"),
+                Revision.fromString("r3-0-2"),
+                Revision.fromString("r2-0-3"));
+        revTime = Utils.getMaxExternalTimestamp(revs, localClusterId);
+        assertEquals(3, revTime);
+    }
 }