You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/12/10 10:54:39 UTC

svn commit: r1549801 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/

Author: mreutegg
Date: Tue Dec 10 09:54:38 2013
New Revision: 1549801

URL: http://svn.apache.org/r1549801
Log:
OAK-1254: Parallel execution of SimpleSearchTest fails with MongoMK

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeCommit.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Branch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKBranchMergeTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoUtils.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Branch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Branch.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Branch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Branch.java Tue Dec 10 09:54:38 2013
@@ -97,7 +97,9 @@ class Branch {
         checkArgument(!checkNotNull(base).isBranch(), "Not a trunk revision: %s", base);
         Revision last = commits.lastKey();
         checkArgument(commits.comparator().compare(head, last) > 0);
-        commits.put(head, new BranchCommit(base));
+        BranchCommit bc = new BranchCommit(base);
+        bc.getModifications().put("/", head);
+        commits.put(head, bc);
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java Tue Dec 10 09:54:38 2013
@@ -223,10 +223,12 @@ public class Commit {
             } else {
                 NodeDocument.setCommitRoot(op, revision, commitRootDepth);
                 if (op.isNew()) {
-                    // for new nodes we can safely set _lastRev on insert.
-                    // for existing nodes the _lastRev is updated by the
-                    // background thread to avoid concurrent updates
-                    NodeDocument.setLastRev(op, revision);
+                    if (baseBranchRevision == null) {
+                        // for new non-branch nodes we can safely set _lastRev on
+                        // insert. for existing nodes the _lastRev is updated by
+                        // the background thread to avoid concurrent updates
+                        NodeDocument.setLastRev(op, revision);
+                    }
                     newNodes.add(op);
                 } else {
                     changedNodes.add(op);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java Tue Dec 10 09:54:38 2013
@@ -16,10 +16,13 @@
  */
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 
 import javax.annotation.Nonnull;
@@ -50,64 +53,76 @@ class CommitQueue {
         this.dispatcher = dispatcher;
     }
 
-    Commit createCommit(Revision base) {
+    @Nonnull
+    Revision createRevision() {
+        return createRevisions(1).first();
+    }
+
+    @Nonnull
+    SortedSet<Revision> createRevisions(int num) {
+        checkArgument(num > 0);
+        SortedSet<Revision> revs = new TreeSet<Revision>(
+                new StableRevisionComparator());
+        Revision rev = null;
         synchronized (this) {
-            Revision rev = store.newRevision();
-            Commit c = new Commit(store, base, rev);
+            for (int i = 0; i < num; i++) {
+                rev = store.newRevision();
+                revs.add(rev);
+            }
             commits.put(rev, new Entry(rev));
-            LOG.debug("created commit {}", rev);
-            return c;
         }
+        LOG.debug("created commit {}", rev);
+        return revs;
     }
 
-    void done(@Nonnull Commit c, boolean isBranch, @Nullable CommitInfo info) {
-        checkNotNull(c);
+    void done(@Nonnull Revision rev, boolean isBranch, @Nullable CommitInfo info) {
+        checkNotNull(rev);
         if (isBranch) {
-            removeCommit(c);
+            removeCommit(rev);
         } else {
-            afterTrunkCommit(c, info);
+            afterTrunkCommit(rev, info);
         }
     }
 
-    void canceled(@Nonnull Commit c) {
-        removeCommit(c);
+    void canceled(@Nonnull Revision rev) {
+        removeCommit(rev);
     }
 
     //------------------------< internal >--------------------------------------
 
-    private void removeCommit(@Nonnull Commit c) {
+    private void removeCommit(@Nonnull Revision rev) {
         // simply remove and notify next head if any
         synchronized (this) {
-            boolean wasHead = commits.firstKey().equals(c.getRevision());
-            commits.remove(c.getRevision());
-            LOG.debug("removed commit {}, wasHead={}", c.getRevision(), wasHead);
+            boolean wasHead = commits.firstKey().equals(rev);
+            commits.remove(rev);
+            LOG.debug("removed commit {}, wasHead={}", rev, wasHead);
             if (wasHead) {
                 notifyHead();
             }
         }
     }
 
-    private void afterTrunkCommit(Commit c, CommitInfo info) {
+    private void afterTrunkCommit(Revision rev, CommitInfo info) {
         assert !commits.isEmpty();
 
         boolean isHead;
         Entry commitEntry;
         synchronized (this) {
-            isHead = commits.firstKey().equals(c.getRevision());
-            commitEntry = commits.get(c.getRevision());
+            isHead = commits.firstKey().equals(rev);
+            commitEntry = commits.get(rev);
         }
         if (!isHead) {
-            LOG.debug("not head: {}, waiting...", c.getRevision());
+            LOG.debug("not head: {}, waiting...", rev);
             commitEntry.await();
         }
         synchronized (this) {
-            commits.remove(c.getRevision());
+            commits.remove(rev);
             try {
-                LOG.debug("removed {}, head is now {}", c.getRevision(), commits.isEmpty() ? null : commits.firstKey());
+                LOG.debug("removed {}, head is now {}", rev, commits.isEmpty() ? null : commits.firstKey());
                 // remember before revision
                 Revision before = store.getHeadRevision();
                 // update head revision
-                store.setHeadRevision(c.getRevision());
+                store.setHeadRevision(rev);
                 NodeState root = store.getRoot();
                 // TODO: correct?
                 dispatcher.contentChanged(store.getRoot(before), null);

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeCommit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeCommit.java?rev=1549801&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeCommit.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeCommit.java Tue Dec 10 09:54:38 2013
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.SortedSet;
+
+/**
+ * A merge commit containing multiple commit revisions. One for each branch
+ * commit to merge.
+ */
+class MergeCommit extends Commit {
+
+    private final SortedSet<Revision> mergeRevs;
+
+    MergeCommit(MongoNodeStore nodeStore,
+                Revision baseRevision,
+                SortedSet<Revision> revisions) {
+        super(nodeStore, baseRevision, revisions.last());
+        this.mergeRevs = revisions;
+    }
+
+    SortedSet<Revision> getMergeRevisions() {
+        return mergeRevs;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeCommit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeCommit.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java Tue Dec 10 09:54:38 2013
@@ -394,7 +394,36 @@ public final class MongoNodeStore
         boolean success = false;
         Commit c;
         try {
-            c = commitQueue.createCommit(base);
+            c = new Commit(this, base, commitQueue.createRevision());
+            success = true;
+        } finally {
+            if (!success) {
+                backgroundOperationLock.readLock().unlock();
+            }
+        }
+        return c;
+    }
+
+    /**
+     * Creates a new merge commit. The caller must acknowledge the commit either with
+     * {@link #done(Commit, boolean, CommitInfo)} or {@link #canceled(Commit)},
+     * depending on the result of the commit.
+     *
+     * @param base the base revision for the commit or <code>null</code> if the
+     *             commit should use the current head revision as base.
+     * @param numBranchCommits the number of branch commits to merge.
+     * @return a new merge commit.
+     */
+    @Nonnull
+    MergeCommit newMergeCommit(@Nullable Revision base, int numBranchCommits) {
+        if (base == null) {
+            base = headRevision;
+        }
+        backgroundOperationLock.readLock().lock();
+        boolean success = false;
+        MergeCommit c;
+        try {
+            c = new MergeCommit(this, base, commitQueue.createRevisions(numBranchCommits));
             success = true;
         } finally {
             if (!success) {
@@ -406,7 +435,7 @@ public final class MongoNodeStore
 
     void done(@Nonnull Commit c, boolean isBranch, @Nullable CommitInfo info) {
         try {
-            commitQueue.done(c, isBranch, info);
+            commitQueue.done(c.getRevision(), isBranch, info);
         } finally {
             backgroundOperationLock.readLock().unlock();
         }
@@ -414,7 +443,7 @@ public final class MongoNodeStore
 
     void canceled(Commit c) {
         try {
-            commitQueue.canceled(c);
+            commitQueue.canceled(c.getRevision());
         } finally {
             backgroundOperationLock.readLock().unlock();
         }
@@ -862,16 +891,18 @@ public final class MongoNodeStore
         if (b != null) {
             base = b.getBase(branchHead);
         }
+        int numBranchCommits = b != null ? b.getCommits().size() : 1;
         boolean success = false;
-        Commit commit = newCommit(base);
+        MergeCommit commit = newMergeCommit(base, numBranchCommits);
         try {
             // make branch commits visible
             UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
             NodeDocument.setModified(op, commit.getRevision());
             if (b != null) {
-                String commitTag = "c-" + commit.getRevision().toString();
+                Iterator<Revision> mergeCommits = commit.getMergeRevisions().iterator();
                 for (Revision rev : b.getCommits()) {
                     rev = rev.asTrunkRevision();
+                    String commitTag = "c-" + mergeCommits.next();
                     NodeDocument.setRevision(op, rev, commitTag);
                     op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
                 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java Tue Dec 10 09:54:38 2013
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -497,27 +496,27 @@ public class NodeDocument extends Docume
         if (lastModified != null) {
             lastRevs.put(context.getClusterId(), lastModified);
         }
-        // filter out revisions newer than branch base
+        Revision branchBase = null;
         if (branch != null) {
-            Revision base = branch.getBase(readRevision);
-            for (Iterator<Revision> it = lastRevs.values().iterator(); it
-                    .hasNext();) {
-                Revision r = it.next();
-                if (isRevisionNewer(context, r, base)) {
-                    it.remove();
-                }
-            }
+            branchBase = branch.getBase(readRevision);
         }
         for (Revision r : lastRevs.values()) {
             // ignore if newer than readRevision
             if (isRevisionNewer(context, r, readRevision)) {
                 // the node has a _lastRev which is newer than readRevision
-                // this means we don't know when if this node was
+                // this means we don't know when this node was
                 // modified by an operation on a descendant node between
                 // current lastRevision and readRevision. therefore we have
                 // to stay on the safe side and use readRevision
                 lastRevision = readRevision;
                 continue;
+            } else if (branchBase != null && isRevisionNewer(context, r, branchBase)) {
+                // readRevision is on a branch and the node has a
+                // _lastRev which is newer than the base of the branch
+                // we cannot use this _lastRev because it is not visible
+                // from this branch. highest possible revision of visible
+                // changes is the base of the branch
+                r = branchBase;
             }
             if (isRevisionNewer(context, r, lastRevision)) {
                 lastRevision = r;
@@ -528,7 +527,7 @@ public class NodeDocument extends Docume
             // -> possibly overlay with unsaved last revs from branch
             Revision r = branch.getUnsavedLastRevision(path, readRevision);
             if (r != null) {
-                lastRevision = r;
+                lastRevision = r.asBranchRevision();
             }
         }
         n.setLastRevision(lastRevision);
@@ -1060,11 +1059,22 @@ public class NodeDocument extends Docume
         Revision latestRev = null;
         for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
             Revision propRev = entry.getKey();
-            if (min != null && isRevisionNewer(context, min, propRev)) {
+            // resolve revision
+            NodeDocument commitRoot = getCommitRoot(propRev);
+            if (commitRoot == null) {
+                continue;
+            }
+            String commitValue = commitRoot.getCommitValue(propRev);
+            if (commitValue == null) {
+                continue;
+            }
+            if (min != null && isRevisionNewer(context, min,
+                    Utils.resolveCommitRevision(propRev, commitValue))) {
                 continue;
             }
             if (isValidRevision(context, propRev, readRevision, validRevisions)) {
-                latestRev = propRev;
+                // TODO: need to check older revisions as well?
+                latestRev = Utils.resolveCommitRevision(propRev, commitValue);
                 value = entry.getValue();
                 break;
             }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java Tue Dec 10 09:54:38 2013
@@ -37,7 +37,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Sets;
@@ -200,7 +199,6 @@ public class ClusterTest {
     /**
      * Test for OAK-1254
      */
-    @Ignore
     @Test
     public void clusterBranchRebase() throws Exception {
         MongoMK mk1 = createMK(1, 0);
@@ -406,7 +404,6 @@ public class ClusterTest {
     }
 
     private void traverse(NodeState node, String path) {
-        System.out.println("traversing: " + path);
         for (ChildNodeEntry child : node.getChildNodeEntries()) {
             traverse(child.getNodeState(), PathUtils.concat(path, child.getName()));
         }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Tue Dec 10 09:54:38 2013
@@ -75,8 +75,7 @@ public class CommitQueueTest {
                 public void run() {
                     try {
                         for (int i = 0; i < COMMITS_PER_WRITER; i++) {
-                            Revision base = store.getHeadRevision();
-                            Commit c = queue.createCommit(base);
+                            Revision rev = queue.createRevision();
                             try {
                                 Thread.sleep(0, random.nextInt(1000));
                             } catch (InterruptedException e) {
@@ -84,10 +83,10 @@ public class CommitQueueTest {
                             }
                             if (random.nextInt(5) == 0) {
                                 // cancel 20% of the commits
-                                queue.canceled(c);
+                                queue.canceled(rev);
                             } else {
                                 boolean isBranch = random.nextInt(5) == 0;
-                                queue.done(c, isBranch, null);
+                                queue.done(rev, isBranch, null);
                             }
                         }
                     } catch (Exception e) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKBranchMergeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKBranchMergeTest.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKBranchMergeTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKBranchMergeTest.java Tue Dec 10 09:54:38 2013
@@ -441,20 +441,6 @@ public class MongoMKBranchMergeTest exte
         }
     }
 
-    @Test
-    public void branchReadAfterMerge() {
-        String branchRev = mk.branch(null);
-        String branchRev1 = mk.commit("/", "+\"foo\":{}", branchRev, null);
-        String branchRev2 = mk.commit("/", "+\"bar\":{}", branchRev1, null);
-        mk.merge(branchRev2, null);
-
-        assertNodesExist(branchRev2, "/foo");
-        assertNodesExist(branchRev2, "/bar");
-
-        assertNodesExist(branchRev1, "/foo");
-        assertNodesNotExist(branchRev1, "/bar");
-    }
-
     //--------------------------< internal >------------------------------------
 
     private String setProp(String rev, String prop, Object value) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoUtils.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoUtils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoUtils.java Tue Dec 10 09:54:38 2013
@@ -39,9 +39,7 @@ public class MongoUtils {
     protected static Exception exception;
 
     /**
-     * Get a connection if available. If not available, null is returned. If
-     * called again, the same connection is returned, or null is returned if
-     * there was an error.
+     * Get a connection if available. If not available, null is returned.
      * 
      * @return the connection or null
      */

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java?rev=1549801&r1=1549800&r2=1549801&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java Tue Dec 10 09:54:38 2013
@@ -45,10 +45,12 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Concurrently add nodes with multiple sessions on multiple cluster nodes.
  */
-@Ignore("OAK-1254")
 public class ConcurrentAddNodesClusterIT {
 
     private static final int NUM_CLUSTER_NODES = 3;
@@ -79,6 +81,7 @@ public class ConcurrentAddNodesClusterIT
         dropDB();
     }
 
+    @Ignore("OAK-1254")
     @Test
     public void addNodesConcurrent() throws Exception {
         for (int i = 0; i < NUM_CLUSTER_NODES; i++) {
@@ -138,6 +141,118 @@ public class ConcurrentAddNodesClusterIT
         }
     }
 
+    @Test
+    public void addNodes2() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            MongoMK mk = new MongoMK.Builder()
+                    .setMongoDB(createConnection().getDB())
+                    .setAsyncDelay(0)
+                    .setClusterId(i + 1).open();
+            mks.add(mk);
+        }
+        final MongoMK mk1 = mks.get(0);
+        final MongoMK mk2 = mks.get(1);
+        final MongoMK mk3 = mks.get(2);
+        Repository r1 = new Jcr(mk1).createRepository();
+        Repository r2 = new Jcr(mk2).createRepository();
+        Repository r3 = new Jcr(mk3).createRepository();
+
+        Session s1 = r1.login(new SimpleCredentials("admin", "admin".toCharArray()));
+        Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray()));
+        Session s3 = r3.login(new SimpleCredentials("admin", "admin".toCharArray()));
+
+        ensureIndex(s1.getRootNode(), PROP_NAME);
+        syncMKs(1);
+        ensureIndex(s2.getRootNode(), PROP_NAME);
+        ensureIndex(s3.getRootNode(), PROP_NAME);
+
+        // begin test
+
+        Node root2 = s2.getRootNode().addNode("testroot-Worker-2", "nt:unstructured");
+        createNodes(root2, "testnode0");
+        s2.save();
+
+        createNodes(root2, "testnode1");
+
+        runBackgroundOps(mk1);
+        runBackgroundOps(mk3);
+        runBackgroundOps(mk2); // publish 'testroot-Worker-2/testnode0'
+
+        Node root3 = s3.getRootNode().addNode("testroot-Worker-3", "nt:unstructured");
+        createNodes(root3, "testnode0");
+
+        s2.save();
+        createNodes(root2, "testnode2");
+
+        runBackgroundOps(mk1); // sees 'testroot-Worker-2/testnode0'
+        runBackgroundOps(mk3); // sees 'testroot-Worker-2/testnode0'
+        runBackgroundOps(mk2); // publish 'testroot-Worker-2/testnode1'
+
+        // subsequent read on mk3 will read already published docs from mk2
+        s3.save();
+        createNodes(root3, "testnode1");
+
+        Node root1 = s1.getRootNode().addNode("testroot-Worker-1", "nt:unstructured");
+        createNodes(root1, "testnode0");
+
+        s2.save();
+        createNodes(root2, "testnode3");
+
+        runBackgroundOps(mk1);
+        runBackgroundOps(mk3);
+        runBackgroundOps(mk2);
+
+        s1.save();
+        createNodes(root1, "testnode1");
+
+        s3.save();
+        createNodes(root3, "testnode2");
+
+        runBackgroundOps(mk1);
+
+        s1.save();
+    }
+
+    @Test
+    public void rebaseVisibility() throws Exception {
+        for (int i = 0; i < 2; i++) {
+            MongoMK mk = new MongoMK.Builder()
+                    .setMongoDB(createConnection().getDB())
+                    .setAsyncDelay(0)
+                    .setClusterId(i + 1).open();
+            mks.add(mk);
+        }
+        final MongoMK mk1 = mks.get(0);
+        final MongoMK mk2 = mks.get(1);
+        Repository r1 = new Jcr(mk1).createRepository();
+        Repository r2 = new Jcr(mk2).createRepository();
+
+        Session s1 = r1.login(new SimpleCredentials("admin", "admin".toCharArray()));
+        Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray()));
+
+        Node root1 = s1.getRootNode().addNode("session-1");
+        s1.save();
+        Node root2 = s2.getRootNode().addNode("session-2");
+        s2.save();
+
+        runBackgroundOps(mk1);
+        runBackgroundOps(mk2);
+        runBackgroundOps(mk1);
+
+        createNodes(root1, "nodes");
+
+        createNodes(root2, "nodes");
+        s2.save();
+
+        runBackgroundOps(mk2);
+        runBackgroundOps(mk1);
+
+        assertFalse(s1.getRootNode().hasNode("session-2/nodes"));
+
+        s1.refresh(true);
+        assertTrue(s1.getRootNode().hasNode("session-2/nodes"));
+    }
+
     private void syncMKs(int delay) {
         EXECUTOR.schedule(new Callable<Object>() {
             @Override
@@ -193,6 +308,7 @@ public class ConcurrentAddNodesClusterIT
             root.getSession().save();
         } catch (RepositoryException e) {
             // created by other thread -> ignore
+            root.getSession().refresh(false);
         }
     }
 
@@ -246,4 +362,13 @@ public class ConcurrentAddNodesClusterIT
             session.save();
         }
     }
+
+    private void createNodes(Node parent, String child)
+            throws RepositoryException {
+        Node node = parent.addNode(child, "nt:unstructured");
+        for (int i = 0; i < NODE_COUNT; i++) {
+            Node c = node.addNode("node" + i, "nt:unstructured");
+            c.setProperty(PROP_NAME, i);
+        }
+    }
 }