You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/10/22 15:12:01 UTC

svn commit: r1534626 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/mongomk/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/

Author: mreutegg
Date: Tue Oct 22 13:12:01 2013
New Revision: 1534626

URL: http://svn.apache.org/r1534626
Log:
OAK-1080: MongoMK: improved concurrency

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <code>CommitQueue</code> ensures a sequence of commits consistent with the
+ * commit revision even if commits did not complete in this sequence.
+ */
+class CommitQueue {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class);
+
+    private final MongoNodeStore store;
+
+    private final SortedMap<Revision, Entry> commits = new TreeMap<Revision, Entry>(new StableRevisionComparator());
+
+    private final ChangeDispatcher dispatcher;
+
+    CommitQueue(MongoNodeStore store, ChangeDispatcher dispatcher) {
+        this.store = store;
+        this.dispatcher = dispatcher;
+    }
+
+    Commit createCommit(Revision base) {
+        synchronized (this) {
+            Commit c = new Commit(store, base, store.newRevision());
+            commits.put(c.getRevision(), new Entry(c));
+            LOG.debug("created commit {}", c.getRevision());
+            return c;
+        }
+    }
+
+    void done(@Nonnull Commit c, boolean isBranch, @Nullable CommitInfo info) {
+        checkNotNull(c);
+        if (isBranch) {
+            removeCommit(c);
+        } else {
+            checkArgument(info != null, "Trunk commit must have a CommitInfo");
+            afterTrunkCommit(c, info);
+        }
+    }
+
+    void canceled(@Nonnull Commit c) {
+        removeCommit(c);
+    }
+
+    //------------------------< internal >--------------------------------------
+
+    private void removeCommit(@Nonnull Commit c) {
+        // simply remove and notify next head if any
+        synchronized (this) {
+            boolean wasHead = commits.firstKey().equals(c.getRevision());
+            commits.remove(c.getRevision());
+            LOG.debug("removed commit {}, wasHead={}", c.getRevision(), wasHead);
+            if (wasHead) {
+                notifyHead();
+            }
+        }
+    }
+
+    private void afterTrunkCommit(Commit c, CommitInfo info) {
+        assert(!commits.isEmpty());
+
+        boolean isHead;
+        Entry commitEntry;
+        synchronized (this) {
+            isHead = commits.firstKey().equals(c.getRevision());
+            commitEntry = commits.get(c.getRevision());
+        }
+        if (!isHead) {
+            LOG.debug("not head: {}, waiting...", c.getRevision());
+            commitEntry.await();
+        }
+        synchronized (this) {
+            commits.remove(c.getRevision());
+            try {
+                LOG.debug("removed {}, head is now {}", c.getRevision(), commits.isEmpty() ? null : commits.firstKey());
+                // remember before revision
+                Revision before = store.getHeadRevision();
+                // update head revision
+                store.setHeadRevision(c.getRevision());
+                NodeState root = store.getRoot();
+                // TODO: correct?
+                dispatcher.beforeCommit(store.getRoot(before));
+                try {
+                    dispatcher.localCommit(root, info);
+                } finally {
+                    dispatcher.afterCommit(root);
+                }
+            } finally {
+                // notify next if there is any
+                notifyHead();
+            }
+        }
+    }
+
+    private void notifyHead() {
+        if (!commits.isEmpty()) {
+            LOG.debug("count down for {}", commits.firstKey());
+            commits.get(commits.firstKey()).latch.countDown();
+        }
+    }
+
+    private static final class Entry {
+
+        final Commit commit;
+        final CountDownLatch latch;
+
+        Entry(Commit commit) {
+            this.commit = commit;
+            this.latch = new CountDownLatch(1);
+        }
+
+        void await() {
+            for (;;) {
+                try {
+                    LOG.debug("awaiting {}", commit.getRevision());
+                    latch.await();
+                    break;
+                } catch (InterruptedException e) {
+                    // retry
+                }
+            }
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1534626&r1=1534625&r2=1534626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Tue Oct 22 13:12:01 2013
@@ -42,8 +42,6 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.plugins.mongomk.Node.Children;
 import org.apache.jackrabbit.oak.plugins.mongomk.blob.MongoBlobStore;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -66,8 +64,6 @@ public class MongoMK implements MicroKer
      */
     static final boolean LIRS_CACHE = Boolean.parseBoolean(System.getProperty("oak.mongoMK.lirsCache", "false"));
 
-    private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
-
     /**
      * Enable fast diff operations.
      */
@@ -96,7 +92,6 @@ public class MongoMK implements MicroKer
     private final CacheStats diffCacheStats;
 
     MongoMK(Builder builder) {
-
         this.nodeStore = builder.getNodeStore();
         this.store = nodeStore.getDocumentStore();
         this.blobStore = builder.getBlobStore();
@@ -104,11 +99,8 @@ public class MongoMK implements MicroKer
         diffCache = builder.buildCache(builder.getDiffCacheSize());
         diffCacheStats = new CacheStats(diffCache, "MongoMk-DiffCache",
                 builder.getWeigher(), builder.getDiffCacheSize());
-
-        LOG.info("Initialized MongoMK with clusterNodeId: {}", nodeStore.getClusterId());
     }
 
-
     public void dispose() {
         nodeStore.dispose();
     }
@@ -380,88 +372,18 @@ public class MongoMK implements MicroKer
     }
 
     @Override
-    public String commit(String rootPath, String json, String baseRevId,
+    public String commit(String rootPath, String jsonDiff, String baseRevId,
             String message) throws MicroKernelException {
-        synchronized (nodeStore) {
-            Revision baseRev;
-            if (baseRevId == null) {
-                baseRev = nodeStore.getHeadRevision();
-                baseRevId = baseRev.toString();
-            } else {
-                baseRev = Revision.fromString(baseRevId);
-            }
-            JsopReader t = new JsopTokenizer(json);
-            Revision rev = nodeStore.newRevision();
-            Commit commit = new Commit(nodeStore, baseRev, rev);
-            while (true) {
-                int r = t.read();
-                if (r == JsopReader.END) {
-                    break;
-                }
-                String path = PathUtils.concat(rootPath, t.readString());
-                switch (r) {
-                    case '+':
-                        t.read(':');
-                        t.read('{');
-                        parseAddNode(commit, t, path);
-                        break;
-                    case '-':
-                        commit.removeNode(path);
-                        nodeStore.markAsDeleted(path, commit, true);
-                        commit.removeNodeDiff(path);
-                        break;
-                    case '^':
-                        t.read(':');
-                        String value;
-                        if (t.matches(JsopReader.NULL)) {
-                            value = null;
-                        } else {
-                            value = t.readRawValue().trim();
-                        }
-                        String p = PathUtils.getParentPath(path);
-                        String propertyName = PathUtils.getName(path);
-                        commit.updateProperty(p, propertyName, value);
-                        commit.updatePropertyDiff(p, propertyName, value);
-                        break;
-                    case '>': {
-                        // TODO support moving nodes that were modified within this commit
-                        t.read(':');
-                        String sourcePath = path;
-                        String targetPath = t.readString();
-                        if (!PathUtils.isAbsolute(targetPath)) {
-                            targetPath = PathUtils.concat(rootPath, targetPath);
-                        }
-                        if (!nodeExists(sourcePath, baseRevId)) {
-                            throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
-                        } else if (nodeExists(targetPath, baseRevId)) {
-                            throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
-                        }
-                        commit.moveNode(sourcePath, targetPath);
-                        nodeStore.moveNode(sourcePath, targetPath, commit);
-                        break;
-                    }
-                    case '*': {
-                        // TODO support copying nodes that were modified within this commit
-                        t.read(':');
-                        String sourcePath = path;
-                        String targetPath = t.readString();
-                        if (!PathUtils.isAbsolute(targetPath)) {
-                            targetPath = PathUtils.concat(rootPath, targetPath);
-                        }
-                        if (!nodeExists(sourcePath, baseRevId)) {
-                            throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
-                        } else if (nodeExists(targetPath, baseRevId)) {
-                            throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
-                        }
-                        commit.copyNode(sourcePath, targetPath);
-                        nodeStore.copyNode(sourcePath, targetPath, commit);
-                        break;
-                    }
-                    default:
-                        throw new MicroKernelException("token: " + (char) t.getTokenType());
-                }
-            }
-            if (baseRev.isBranch()) {
+        boolean success = false;
+        boolean isBranch = false;
+        Revision rev;
+        Commit commit = nodeStore.newCommit(baseRevId != null ? Revision.fromString(baseRevId) : null);
+        try {
+            Revision baseRev = commit.getBaseRevision();
+            isBranch = baseRev != null && baseRev.isBranch();
+            rev = commit.getRevision();
+            parseJsonDiff(commit, jsonDiff, rootPath);
+            if (isBranch) {
                 rev = rev.asBranchRevision();
                 // remember branch commit
                 Branch b = nodeStore.getBranches().getBranch(baseRev);
@@ -471,7 +393,6 @@ public class MongoMK implements MicroKer
                 } else {
                     b.addCommit(rev);
                 }
-                boolean success = false;
                 try {
                     // prepare commit
                     commit.prepare(baseRev);
@@ -484,34 +405,19 @@ public class MongoMK implements MicroKer
                         }
                     }
                 }
-
-                return rev.toString();
             } else {
                 commit.apply();
                 nodeStore.setHeadRevision(commit.getRevision());
-                return rev.toString();
+                success = true;
+            }
+        } finally {
+            if (!success) {
+                nodeStore.canceled(commit);
+            } else {
+                nodeStore.done(commit, isBranch);
             }
         }
-    }
-
-    public static void parseAddNode(Commit commit, JsopReader t, String path) {
-        Node n = new Node(path, commit.getRevision());
-        if (!t.matches('}')) {
-            do {
-                String key = t.readString();
-                t.read(':');
-                if (t.matches('{')) {
-                    String childPath = PathUtils.concat(path, key);
-                    parseAddNode(commit, t, childPath);
-                } else {
-                    String value = t.readRawValue().trim();
-                    n.setProperty(key, value);
-                }
-            } while (t.matches(','));
-            t.read('}');
-        }
-        commit.addNode(n);
-        commit.addNodeDiff(n);
+        return rev.toString();
     }
 
     @Override
@@ -526,27 +432,32 @@ public class MongoMK implements MicroKer
     @Override
     public String merge(String branchRevisionId, String message)
             throws MicroKernelException {
-        synchronized (nodeStore) {
-            // TODO improve implementation if needed
-            Revision revision = Revision.fromString(branchRevisionId);
-            if (!revision.isBranch()) {
-                throw new MicroKernelException("Not a branch: " + branchRevisionId);
-            }
-
+        // TODO improve implementation if needed
+        Revision revision = Revision.fromString(branchRevisionId);
+        if (!revision.isBranch()) {
+            throw new MicroKernelException("Not a branch: " + branchRevisionId);
+        }
+        Branch b = nodeStore.getBranches().getBranch(revision);
+        Revision base = revision;
+        if (b != null) {
+            base = b.getBase(revision);
+        }
+        boolean success = false;
+        Commit commit = nodeStore.newCommit(base);
+        try {
             // make branch commits visible
             UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
-            Branch b = nodeStore.getBranches().getBranch(revision);
-            Revision mergeCommit = nodeStore.newRevision();
-            NodeDocument.setModified(op, mergeCommit);
+            NodeDocument.setModified(op, commit.getRevision());
             if (b != null) {
+                String commitTag = "c-" + commit.getRevision().toString();
                 for (Revision rev : b.getCommits()) {
                     rev = rev.asTrunkRevision();
-                    NodeDocument.setRevision(op, rev, "c-" + mergeCommit.toString());
+                    NodeDocument.setRevision(op, rev, commitTag);
                     op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
                 }
                 if (store.findAndUpdate(Collection.NODES, op) != null) {
                     // remove from branchCommits map after successful update
-                    b.applyTo(nodeStore.getPendingModifications(), mergeCommit);
+                    b.applyTo(nodeStore.getPendingModifications(), commit.getRevision());
                     nodeStore.getBranches().remove(b);
                 } else {
                     throw new MicroKernelException("Conflicting concurrent change. Update operation failed: " + op);
@@ -554,9 +465,15 @@ public class MongoMK implements MicroKer
             } else {
                 // no commits in this branch -> do nothing
             }
-            nodeStore.setHeadRevision(mergeCommit);
-            return mergeCommit.toString();
+            success = true;
+        } finally {
+            if (!success) {
+                nodeStore.canceled(commit);
+            } else {
+                nodeStore.done(commit, false);
+            }
         }
+        return commit.getRevision().toString();
     }
 
     @Override
@@ -612,6 +529,8 @@ public class MongoMK implements MicroKer
         }
     }
 
+    //-------------------------< accessors >------------------------------------
+
     public DocumentStore getDocumentStore() {
         return store;
     }
@@ -635,7 +554,103 @@ public class MongoMK implements MicroKer
     public boolean isCached(String path) {
         return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
     }
-    
+
+    //------------------------------< internal >--------------------------------
+
+    private void parseJsonDiff(Commit commit, String json, String rootPath) {
+        String baseRevId = commit.getBaseRevision() != null ?
+                commit.getBaseRevision().toString() : null;
+        JsopReader t = new JsopTokenizer(json);
+        while (true) {
+            int r = t.read();
+            if (r == JsopReader.END) {
+                break;
+            }
+            String path = PathUtils.concat(rootPath, t.readString());
+            switch (r) {
+                case '+':
+                    t.read(':');
+                    t.read('{');
+                    parseAddNode(commit, t, path);
+                    break;
+                case '-':
+                    commit.removeNode(path);
+                    nodeStore.markAsDeleted(path, commit, true);
+                    commit.removeNodeDiff(path);
+                    break;
+                case '^':
+                    t.read(':');
+                    String value;
+                    if (t.matches(JsopReader.NULL)) {
+                        value = null;
+                    } else {
+                        value = t.readRawValue().trim();
+                    }
+                    String p = PathUtils.getParentPath(path);
+                    String propertyName = PathUtils.getName(path);
+                    commit.updateProperty(p, propertyName, value);
+                    commit.updatePropertyDiff(p, propertyName, value);
+                    break;
+                case '>': {
+                    // TODO support moving nodes that were modified within this commit
+                    t.read(':');
+                    String sourcePath = path;
+                    String targetPath = t.readString();
+                    if (!PathUtils.isAbsolute(targetPath)) {
+                        targetPath = PathUtils.concat(rootPath, targetPath);
+                    }
+                    if (!nodeExists(sourcePath, baseRevId)) {
+                        throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+                    } else if (nodeExists(targetPath, baseRevId)) {
+                        throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+                    }
+                    commit.moveNode(sourcePath, targetPath);
+                    nodeStore.moveNode(sourcePath, targetPath, commit);
+                    break;
+                }
+                case '*': {
+                    // TODO support copying nodes that were modified within this commit
+                    t.read(':');
+                    String sourcePath = path;
+                    String targetPath = t.readString();
+                    if (!PathUtils.isAbsolute(targetPath)) {
+                        targetPath = PathUtils.concat(rootPath, targetPath);
+                    }
+                    if (!nodeExists(sourcePath, baseRevId)) {
+                        throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+                    } else if (nodeExists(targetPath, baseRevId)) {
+                        throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+                    }
+                    commit.copyNode(sourcePath, targetPath);
+                    nodeStore.copyNode(sourcePath, targetPath, commit);
+                    break;
+                }
+                default:
+                    throw new MicroKernelException("token: " + (char) t.getTokenType());
+            }
+        }
+    }
+
+    private static void parseAddNode(Commit commit, JsopReader t, String path) {
+        Node n = new Node(path, commit.getRevision());
+        if (!t.matches('}')) {
+            do {
+                String key = t.readString();
+                t.read(':');
+                if (t.matches('{')) {
+                    String childPath = PathUtils.concat(path, key);
+                    parseAddNode(commit, t, childPath);
+                } else {
+                    String value = t.readRawValue().trim();
+                    n.setProperty(key, value);
+                }
+            } while (t.matches(','));
+            t.read('}');
+        }
+        commit.addNode(n);
+        commit.addNodeDiff(n);
+    }
+
     /**
      * A (cached) result of the diff operation.
      */
@@ -654,6 +669,8 @@ public class MongoMK implements MicroKer
 
     }
 
+    //----------------------------< Builder >-----------------------------------
+
     /**
      * A builder for a MongoMK instance.
      */

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.Collections;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.AbstractNodeState;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link NodeState} implementation for the {@link MongoNodeStore}.
+ */
+class MongoNodeState extends AbstractNodeState {
+
+    private final MongoNodeStore store;
+
+    private final String path;
+
+    private final Revision revision;
+
+    MongoNodeState(@Nonnull MongoNodeStore store,
+                   @Nonnull String path,
+                   @Nonnull Revision revision) {
+        this.store = checkNotNull(store);
+        this.path = checkNotNull(path);
+        this.revision = checkNotNull(revision);
+    }
+
+    String getPath() {
+        return path;
+    }
+
+    Revision getRevision() {
+        return revision;
+    }
+
+    //--------------------------< NodeState >-----------------------------------
+
+
+    @Override
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        } else if (that instanceof MongoNodeState) {
+            MongoNodeState other = (MongoNodeState) that;
+            if (revision.equals(other.revision) && path.equals(other.path)) {
+                return true;
+            } else {
+                // TODO: optimize equals check for this case
+            }
+        }
+        if (that instanceof NodeState) {
+            return AbstractNodeState.equals(this, (NodeState) that);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean exists() {
+        // TODO: implement
+        return true;
+    }
+
+    @Nonnull
+    @Override
+    public Iterable<? extends PropertyState> getProperties() {
+        // TODO: implement
+        return Collections.emptyList();
+    }
+
+    @Nonnull
+    @Override
+    public NodeState getChildNode(@Nonnull String name) {
+        // TODO: implement
+        return EmptyNodeState.MISSING_NODE;
+    }
+
+    @Nonnull
+    @Override
+    public Iterable<? extends ChildNodeEntry> getChildNodeEntries() {
+        // TODO: implement
+        return Collections.emptyList();
+    }
+
+    @Nonnull
+    @Override
+    public NodeBuilder builder() {
+        // TODO: implement
+        return new MemoryNodeBuilder(this);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1534626&r1=1534625&r2=1534626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java Tue Oct 22 13:12:01 2013
@@ -36,9 +36,13 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import com.google.common.base.Function;
 import com.google.common.cache.Cache;
@@ -52,6 +56,8 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.plugins.mongomk.util.LoggingDocumentStoreWrapper;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.plugins.observation.Observable;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -63,7 +69,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Implementation of a NodeStore on MongoDB.
  */
-public final class MongoNodeStore implements NodeStore, RevisionContext {
+public final class MongoNodeStore
+        implements NodeStore, RevisionContext, Observable {
 
     private static final Logger LOG = LoggerFactory.getLogger(MongoNodeStore.class);
 
@@ -96,6 +103,16 @@ public final class MongoNodeStore implem
     protected final DocumentStore store;
 
     /**
+     * The commit queue to coordinate the commits.
+     */
+    protected final CommitQueue commitQueue;
+
+    /**
+     * The change dispatcher for this node store.
+     */
+    protected final ChangeDispatcher dispatcher;
+
+    /**
      * Whether this instance is disposed.
      */
     private final AtomicBoolean isDisposed = new AtomicBoolean();
@@ -168,6 +185,12 @@ public final class MongoNodeStore implem
     private Thread backgroundThread;
 
     /**
+     * Read/Write lock for background operations. Regular commits will acquire
+     * a shared lock, while a background write acquires an exclusive lock.
+     */
+    private final ReadWriteLock backgroundOperationLock = new ReentrantReadWriteLock();
+
+    /**
      * Enable using simple revisions (just a counter). This feature is useful
      * for testing.
      */
@@ -245,6 +268,10 @@ public final class MongoNodeStore implem
         backgroundRead();
         getRevisionComparator().add(headRevision, Revision.newRevision(0));
         headRevision = newRevision();
+        dispatcher = new ChangeDispatcher(this);
+        commitQueue = new CommitQueue(this, dispatcher);
+
+        LOG.info("Initialized MongoNodeStore with clusterNodeId: {}", clusterId);
     }
 
     void init() {
@@ -262,7 +289,7 @@ public final class MongoNodeStore implem
         }
         backgroundThread = new Thread(
                 new BackgroundOperation(this, isDisposed),
-                "MongoMK background thread");
+                "MongoNodeStore background thread");
         backgroundThread.setDaemon(true);
         backgroundThread.start();
     }
@@ -286,14 +313,10 @@ public final class MongoNodeStore implem
                 clusterNodeInfo.dispose();
             }
             store.dispose();
-            LOG.info("Disposed MongoMK with clusterNodeId: {}", clusterId);
+            LOG.info("Disposed MongoNodeStore with clusterNodeId: {}", clusterId);
         }
     }
 
-    public void stopBackground() {
-        stopBackground = true;
-    }
-
     @Nonnull
     Revision getHeadRevision() {
         return headRevision;
@@ -323,6 +346,34 @@ public final class MongoNodeStore implem
         return Revision.newRevision(clusterId);
     }
 
+    /**
+     * Creates a new commit. The caller must acknowledge the commit either with
+     * {@link #done(Commit, boolean)} or {@link #canceled(Commit)}, depending
+     * on the result of the commit.
+     *
+     * @param base the base revision for the commit or <code>null</code> if the
+     *             commit should use the current head revision as base.
+     * @return a new commit.
+     */
+    @Nonnull
+    Commit newCommit(@Nullable Revision base) {
+        if (base == null) {
+            base = headRevision;
+        }
+        backgroundOperationLock.readLock().lock();
+        return commitQueue.createCommit(base);
+    }
+
+    void done(@Nonnull Commit c, boolean isBranch) {
+        backgroundOperationLock.readLock().unlock();
+        commitQueue.done(c, isBranch, CommitInfo.EMPTY);
+    }
+
+    void canceled(Commit c) {
+        backgroundOperationLock.readLock().unlock();
+        commitQueue.canceled(c);
+    }
+
     public void setAsyncDelay(int delay) {
         this.asyncDelay = delay;
     }
@@ -351,10 +402,6 @@ public final class MongoNodeStore implem
         return unsavedLastRevisions.getPaths().size();
     }
 
-    public long getSplitDocumentAgeMillis() {
-        return this.splitDocumentAgeMillis;
-    }
-
     /**
      * Checks that revision x is newer than another revision.
      *
@@ -533,7 +580,7 @@ public final class MongoNodeStore implem
             docChildrenCache.put(path, clone);
             c = clone;
         }
-        Iterable<NodeDocument> it = Iterables.transform(c.childNames, 
+        Iterable<NodeDocument> it = Iterables.transform(c.childNames,
                 new Function<String, NodeDocument>() {
             @Override
             public NodeDocument apply(String name) {
@@ -656,13 +703,30 @@ public final class MongoNodeStore implem
         }
     }
 
+    /**
+     * Returns the root node state at the given revision.
+     *
+     * @param revision a revision.
+     * @return the root node state at the given revision.
+     */
+    @Nonnull
+    NodeState getRoot(@Nonnull Revision revision) {
+        return new MongoNodeState(this, "/", revision);
+    }
+
+    //------------------------< Observable >------------------------------------
+
+    @Override
+    public ChangeDispatcher.Listener newListener() {
+        return dispatcher.newListener();
+    }
+
     //-------------------------< NodeStore >------------------------------------
 
     @Nonnull
     @Override
     public NodeState getRoot() {
-        // TODO: implement
-        return null;
+        return getRoot(headRevision);
     }
 
     @Nonnull
@@ -865,61 +929,67 @@ public final class MongoNodeStore implem
         if (unsavedLastRevisions.getPaths().size() == 0) {
             return;
         }
-        ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.getPaths());
-        // sort by depth (high depth first), then path
-        Collections.sort(paths, new Comparator<String>() {
+        Lock writeLock = backgroundOperationLock.writeLock();
+        writeLock.lock();
+        try {
+            ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.getPaths());
+            // sort by depth (high depth first), then path
+            Collections.sort(paths, new Comparator<String>() {
 
-            @Override
-            public int compare(String o1, String o2) {
-                int d1 = Utils.pathDepth(o1);
-                int d2 = Utils.pathDepth(o1);
-                if (d1 != d2) {
-                    return Integer.signum(d1 - d2);
+                @Override
+                public int compare(String o1, String o2) {
+                    int d1 = Utils.pathDepth(o1);
+                    int d2 = Utils.pathDepth(o1);
+                    if (d1 != d2) {
+                        return Integer.signum(d1 - d2);
+                    }
+                    return o1.compareTo(o2);
                 }
-                return o1.compareTo(o2);
-            }
 
-        });
+            });
 
-        long now = Revision.getCurrentTimestamp();
-        UpdateOp updateOp = null;
-        Revision lastRev = null;
-        List<String> ids = new ArrayList<String>();
-        for (int i = 0; i < paths.size(); i++) {
-            String p = paths.get(i);
-            Revision r = unsavedLastRevisions.get(p);
-            if (r == null) {
-                continue;
-            }
-            // FIXME: with below code fragment the root (and other nodes
-            // 'close' to the root) will not be updated in MongoDB when there
-            // are frequent changes.
-            if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
-                continue;
-            }
-            int size = ids.size();
-            if (updateOp == null) {
-                // create UpdateOp
-                Commit commit = new Commit(this, null, r);
-                commit.touchNode(p);
-                updateOp = commit.getUpdateOperationForNode(p);
-                lastRev = r;
-                ids.add(Utils.getIdFromPath(p));
-            } else if (r.equals(lastRev)) {
-                // use multi update when possible
-                ids.add(Utils.getIdFromPath(p));
-            }
-            // update if this is the last path or
-            // revision is not equal to last revision
-            if (i + 1 >= paths.size() || size == ids.size()) {
-                store.update(Collection.NODES, ids, updateOp);
-                for (String id : ids) {
-                    unsavedLastRevisions.remove(Utils.getPathFromId(id));
-                }
-                ids.clear();
-                updateOp = null;
-                lastRev = null;
+            long now = Revision.getCurrentTimestamp();
+            UpdateOp updateOp = null;
+            Revision lastRev = null;
+            List<String> ids = new ArrayList<String>();
+            for (int i = 0; i < paths.size(); i++) {
+                String p = paths.get(i);
+                Revision r = unsavedLastRevisions.get(p);
+                if (r == null) {
+                    continue;
+                }
+                // FIXME: with below code fragment the root (and other nodes
+                // 'close' to the root) will not be updated in MongoDB when there
+                // are frequent changes.
+                if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
+                    continue;
+                }
+                int size = ids.size();
+                if (updateOp == null) {
+                    // create UpdateOp
+                    Commit commit = new Commit(this, null, r);
+                    commit.touchNode(p);
+                    updateOp = commit.getUpdateOperationForNode(p);
+                    lastRev = r;
+                    ids.add(Utils.getIdFromPath(p));
+                } else if (r.equals(lastRev)) {
+                    // use multi update when possible
+                    ids.add(Utils.getIdFromPath(p));
+                }
+                // update if this is the last path or
+                // revision is not equal to last revision
+                if (i + 1 >= paths.size() || size == ids.size()) {
+                    store.update(Collection.NODES, ids, updateOp);
+                    for (String id : ids) {
+                        unsavedLastRevisions.remove(Utils.getPathFromId(id));
+                    }
+                    ids.clear();
+                    updateOp = null;
+                    lastRev = null;
+                }
             }
+        } finally {
+            writeLock.unlock();
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java?rev=1534626&r1=1534625&r2=1534626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java Tue Oct 22 13:12:01 2013
@@ -33,9 +33,37 @@ class UnsavedModifications {
 
     private final ConcurrentHashMap<String, Revision> map = new ConcurrentHashMap<String, Revision>();
 
+    /**
+     * Puts a revision for the given path. The revision for the given path is
+     * only put if there is no modification present for the revision or if the
+     * current modification revision is older than the passed revision.
+     *
+     * @param path the path of the modified node.
+     * @param revision the revision of the modification.
+     * @return the previously set revision for the given path or null if there
+     *          was none or the current revision is newer.
+     */
     @CheckForNull
     public Revision put(@Nonnull String path, @Nonnull Revision revision) {
-        return map.put(checkNotNull(path), checkNotNull(revision));
+        checkNotNull(path);
+        checkNotNull(revision);
+        for (;;) {
+            Revision previous = map.get(path);
+            if (previous == null) {
+                if (map.putIfAbsent(path, revision) == null) {
+                    return null;
+                }
+            } else {
+                if (previous.compareRevisionTime(revision) < 0) {
+                    if (map.replace(path, previous, revision)) {
+                        return previous;
+                    }
+                } else {
+                    // revision is earlier, do not update
+                    return null;
+                }
+            }
+        }
     }
 
     @CheckForNull
@@ -64,12 +92,7 @@ class UnsavedModifications {
      */
     public void applyTo(UnsavedModifications other, Revision mergeCommit) {
         for (Map.Entry<String, Revision> entry : map.entrySet()) {
-            Revision r = other.map.putIfAbsent(entry.getKey(), mergeCommit);
-            if (r != null) {
-                if (r.compareRevisionTime(mergeCommit) < 0) {
-                    other.map.put(entry.getKey(), mergeCommit);
-                }
-            }
+            other.put(entry.getKey(), mergeCommit);
         }
     }
 }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.junit.Test;
+
+/**
+ * Tests for {@link CommitQueue}.
+ */
+public class CommitQueueTest {
+
+    private static final int NUM_WRITERS = 10;
+
+    private static final int COMMITS_PER_WRITER = 100;
+
+    @Test
+    public void concurrentCommits() throws Exception {
+        final MongoNodeStore store = new MongoMK.Builder().getNodeStore();
+        ChangeDispatcher dispatcher = new ChangeDispatcher(store);
+        final ChangeDispatcher.Listener listener = dispatcher.newListener();
+        final AtomicBoolean running = new AtomicBoolean(true);
+        final CommitQueue queue = new CommitQueue(store, dispatcher);
+        final List<Exception> exceptions = Collections.synchronizedList(
+                new ArrayList<Exception>());
+        Thread reader = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Revision before = new Revision(0, 0, store.getClusterId());
+                    while (running.get()) {
+                        ChangeDispatcher.ChangeSet changes = listener.getChanges();
+                        if (changes != null) {
+                            MongoNodeState after = (MongoNodeState) changes.getAfterState();
+                            Revision r = after.getRevision();
+                            // System.out.println("seen: " + r);
+                            if (r.compareRevisionTime(before) < 1) {
+                                exceptions.add(new Exception(
+                                        "Inconsistent revision sequence. Before: " +
+                                                before + ", after: " + r));
+                                break;
+                            }
+                            before = r;
+                        } else {
+                            // avoid busy wait
+                            try {
+                                Thread.sleep(10);
+                            } catch (InterruptedException e) {
+                                // ignore
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+        reader.start();
+
+        // perform commits with multiple threads
+        List<Thread> writers = new ArrayList<Thread>();
+        for (int i = 0; i < NUM_WRITERS; i++) {
+            final Random random = new Random(i);
+            writers.add(new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        for (int i = 0; i < COMMITS_PER_WRITER; i++) {
+                            Revision base = store.getHeadRevision();
+                            Commit c = queue.createCommit(base);
+                            try {
+                                Thread.sleep(0, random.nextInt(1000));
+                            } catch (InterruptedException e) {
+                                // ignore
+                            }
+                            if (random.nextInt(5) == 0) {
+                                // cancel 20% of the commits
+                                queue.canceled(c);
+                            } else {
+                                boolean isBranch = random.nextInt(5) == 0;
+                                queue.done(c, isBranch, CommitInfo.create(
+                                        null, null, System.currentTimeMillis()));
+                            }
+                        }
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    }
+                }
+            }));
+        }
+        for (Thread t : writers) {
+            t.start();
+        }
+        for (Thread t : writers) {
+            t.join();
+        }
+        running.set(false);
+        reader.join();
+        for (Exception e : exceptions) {
+            throw e;
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+/**
+ * Performs non-conflicting property updates on MongoMK with multiple threads.
+ */
+public class ConcurrentUpdatesTest extends AbstractMongoConnectionTest {
+
+    private static final int NUM_WRITERS = 10;
+
+    private static final int NUM_OPS = 100;
+
+    @Test
+    public void test() throws Exception {
+        final List<Exception> exceptions = Collections.synchronizedList(
+                new ArrayList<Exception>());
+        List<Thread> writers = new ArrayList<Thread>();
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < NUM_WRITERS; i++) {
+            final String nodeName = "test-" + i;
+            sb.append("+\"").append(nodeName).append("\":{}");
+            writers.add(new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        for (int i = 0; i < NUM_OPS; i++) {
+                            mk.commit("/" + nodeName, "^\"prop\":" + i, null, null);
+                        }
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    }
+                }
+            }));
+        }
+        mk.commit("/", sb.toString(), null, null);
+        long time = System.currentTimeMillis();
+        for (Thread t : writers) {
+            t.start();
+        }
+        for (Thread t : writers) {
+            t.join();
+        }
+        time = System.currentTimeMillis() - time;
+        // System.out.println(time);
+        for (Exception e : exceptions) {
+            throw e;
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL