You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2014/02/20 21:38:13 UTC

svn commit: r1570339 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/

Author: mreutegg
Date: Thu Feb 20 20:38:13 2014
New Revision: 1570339

URL: http://svn.apache.org/r1570339
Log:
OAK-1427: Improve DocumentNodeStore scalability

- Batch commit updates on commit root document
- Use Guava cache per default
- Increase cache concurrency level to 16

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommitQueue.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PathRev.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ConcurrentCreateNodesTest.java

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java?rev=1570339&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java Thu Feb 20 20:38:13 2014
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+
+/**
+ * Combines multiple {@link UpdateOp} into a single call to the
+ * {@link DocumentStore}.
+ */
+final class BatchCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BatchCommit.class);
+
+    private final CountDownLatch finished = new CountDownLatch(1);
+    private final String id;
+    private final BatchCommitQueue queue;
+
+    private List<UpdateOp> ops;
+    private List<Future<NodeDocument>> results;
+
+    private boolean executing = false;
+
+    BatchCommit(String id, BatchCommitQueue queue, boolean onHold) {
+        this.id = id;
+        this.queue = queue;
+        if (onHold) {
+            ops = Lists.newArrayList();
+            results = Lists.newArrayList();
+        }
+    }
+
+    String getId() {
+        return id;
+    }
+
+    Callable<NodeDocument> enqueue(final UpdateOp op) {
+        checkArgument(op.getId().equals(id),
+                "Cannot add UpdateOp with id %s to BatchCommit with id %s",
+                op.getId(), id);
+        Callable<NodeDocument> result;
+        synchronized (this) {
+            checkState(!executing, "Cannot enqueue when batch is already executing");
+            if (ops != null) {
+                ops.add(op);
+                result = new Callable<NodeDocument>() {
+                    int idx = ops.size() - 1;
+                    @Override
+                    public NodeDocument call() throws Exception {
+                        synchronized (BatchCommit.this) {
+                            while (!executing) {
+                                LOG.debug("Waiting until BatchCommit is executing. {}", id);
+                                BatchCommit.this.wait();
+                            }
+                        }
+                        return execute(idx).get();
+                    }
+                };
+            } else {
+                // not on hold and no other operation in this batch
+                executing = true;
+                result = new Callable<NodeDocument>() {
+                    @Override
+                    public NodeDocument call() throws Exception {
+                        try {
+                            return queue.getStore().findAndUpdate(NODES, op);
+                        } finally {
+                            queue.finished(BatchCommit.this);
+                        }
+                    }
+                };
+            }
+        }
+        return result;
+    }
+
+    void release() {
+        synchronized (this) {
+            executing = true;
+            notifyAll();
+        }
+    }
+
+    Future<NodeDocument> execute(int idx) {
+        if (idx == 0) {
+            UpdateOp combined = UpdateOp.combine(id, ops);
+            NodeDocument before = null;
+            try {
+                before = queue.getStore().findAndUpdate(NODES, combined);
+            } catch (Throwable t) {
+                LOG.warn("BatchCommit failed, will retry individually. " + t.getMessage());
+            }
+            if (before == null) {
+                // batch commit unsuccessful, execute individually
+                executeIndividually();
+            } else {
+                populateResults(before);
+            }
+            queue.finished(this);
+            finished.countDown();
+        } else {
+            try {
+                finished.await();
+            } catch (InterruptedException e) {
+                String msg = "Interrupted while waiting for batch commit to finish";
+                return Futures.immediateFailedFuture(new MicroKernelException(msg));
+            }
+        }
+        return results.get(idx);
+    }
+
+    void executeIndividually() {
+        DocumentStore store = queue.getStore();
+        for (UpdateOp op : ops) {
+            SettableFuture<NodeDocument> result = SettableFuture.create();
+            try {
+                result.set(store.findAndUpdate(NODES, op));
+            } catch (Throwable t) {
+                result.setException(t);
+            }
+            results.add(result);
+        }
+    }
+
+    void populateResults(NodeDocument before) {
+        DocumentStore store = queue.getStore();
+        Comparator<Revision> comparator = queue.getComparator();
+        for (UpdateOp op : ops) {
+            results.add(Futures.immediateFuture(before));
+            NodeDocument after = new NodeDocument(store);
+            before.deepCopy(after);
+            UpdateUtils.applyChanges(after, op, comparator);
+            before = after;
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommitQueue.java?rev=1570339&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommitQueue.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommitQueue.java Thu Feb 20 20:38:13 2014
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Queues updates on a commit root document and batches them into a single
+ * call to the {@link DocumentStore}.
+ */
+final class BatchCommitQueue {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BatchCommitQueue.class);
+
+    /**
+     * The pending batch commits.
+     */
+    private final Map<String, BatchCommit> pending = Maps.newHashMap();
+
+    /**
+     * The batch commits in progress.
+     */
+    private final Map<String, BatchCommit> inProgress = Maps.newHashMap();
+
+    private final DocumentStore store;
+
+    private final Comparator<Revision> comparator;
+
+    BatchCommitQueue(@Nonnull DocumentStore store,
+                     @Nonnull Comparator<Revision> comparator) {
+        this.store = checkNotNull(store);
+        this.comparator = checkNotNull(comparator);
+    }
+
+    Callable<NodeDocument> updateDocument(UpdateOp op) {
+        String id = op.getId();
+        // check if there is already a batch commit in progress for
+        // the document
+        synchronized (this) {
+            BatchCommit commit = inProgress.get(id);
+            if (commit != null) {
+                LOG.debug("Commit with id {} in progress", id);
+                // get or create a pending batch commit
+                commit = pending.get(id);
+                if (commit == null) {
+                    LOG.debug("Creating pending BatchCommit for id {}", id);
+                    commit = new BatchCommit(id, this, true);
+                    pending.put(id, commit);
+                }
+            } else {
+                commit = new BatchCommit(id, this, false);
+                LOG.debug("Adding inProgress BatchCommit for id {}", id);
+                inProgress.put(id, commit);
+            }
+            LOG.debug("Enqueueing operation with id {}", id);
+            return commit.enqueue(op);
+        }
+    }
+
+    void finished(BatchCommit commit) {
+        String id = commit.getId();
+        synchronized (this) {
+            LOG.debug("BatchCommit finished with id {}", id);
+            if (inProgress.remove(id) == null) {
+                throw new IllegalStateException("BatchCommit for " +
+                        id + " is not in progress");
+            }
+            commit = pending.remove(id);
+            if (commit != null) {
+                LOG.debug("Moving pending BatchCommit to inProgress with id {}", id);
+                inProgress.put(id, commit);
+                commit.release();
+                LOG.debug("BatchCommit released with id {}", id);
+            }
+        }
+    }
+
+    DocumentStore getStore() {
+        return store;
+    }
+
+    Comparator<Revision> getComparator() {
+        return comparator;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommitQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommitQueue.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java Thu Feb 20 20:38:13 2014
@@ -149,16 +149,57 @@ public class Commit {
     }
 
     /**
+     * Applies this commit to the store.
+     *
+     * @return the commit revision.
+     * @throws MicroKernelException if the commit cannot be applied.
+     *              TODO: use non-MK exception type
+     */
+    @Nonnull
+    Revision apply() throws MicroKernelException {
+        boolean success = false;
+        Revision baseRev = getBaseRevision();
+        boolean isBranch = baseRev != null && baseRev.isBranch();
+        Revision rev = getRevision();
+        if (isBranch) {
+            rev = rev.asBranchRevision();
+            // remember branch commit
+            Branch b = nodeStore.getBranches().getBranch(baseRev);
+            if (b == null) {
+                // baseRev is marker for new branch
+                b = nodeStore.getBranches().create(baseRev.asTrunkRevision(), rev);
+            } else {
+                b.addCommit(rev);
+            }
+            try {
+                // prepare commit
+                prepare(baseRev);
+                success = true;
+            } finally {
+                if (!success) {
+                    b.removeCommit(rev);
+                    if (!b.hasCommits()) {
+                        nodeStore.getBranches().remove(b);
+                    }
+                }
+            }
+        } else {
+            applyInternal();
+        }
+        return rev;
+    }
+
+    /**
      * Apply the changes to the document store and the cache.
      */
-    void apply() {
+    private void applyInternal() {
         if (!operations.isEmpty()) {
             updateParentChildStatus();
             applyToDocumentStore();
         }
     }
 
-    void prepare(Revision baseRevision) {
+    private void prepare(Revision baseRevision) {
         if (!operations.isEmpty()) {
             updateParentChildStatus();
             applyToDocumentStore(baseRevision);
@@ -178,7 +219,7 @@ public class Commit {
      * @param baseBranchRevision the base revision of this commit. Currently only
      *                     used for branch commits.
      */
-    void applyToDocumentStore(Revision baseBranchRevision) {
+    private void applyToDocumentStore(Revision baseBranchRevision) {
         // the value in _revisions.<revision> property of the commit root node
         // regular commits use "c", which makes the commit visible to
         // other readers. branch commits use the base revision to indicate
@@ -288,7 +329,7 @@ public class Commit {
                     // only set revision on commit root when there is
                     // no collision for this commit revision
                     commit.containsMapEntry(COLLISIONS, revision, false);
-                    NodeDocument before = store.findAndUpdate(NODES, commit);
+                    NodeDocument before = nodeStore.updateCommitRoot(commit);
                     if (before == null) {
                         String msg = "Conflicting concurrent change. " +
                                 "Update operation failed: " + commitRoot;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Thu Feb 20 20:38:13 2014
@@ -61,13 +61,19 @@ public class DocumentMK implements Micro
      * Enable the LIRS cache.
      */
     static final boolean LIRS_CACHE = Boolean.parseBoolean(
-            System.getProperty("oak.documentMK.lirsCache", "true"));
+            System.getProperty("oak.documentMK.lirsCache", "false"));
 
     /**
      * Enable fast diff operations.
      */
     static final boolean FAST_DIFF = Boolean.parseBoolean(
             System.getProperty("oak.documentMK.fastDiff", "true"));
+
+    /**
+     * The guava cache concurrency level.
+     */
+    static final int CACHE_CONCURRENCY = Integer.getInteger(
+            "oak.documentMK.cacheConcurrency", 16);
         
     /**
      * The node store.
@@ -232,7 +238,7 @@ public class DocumentMK implements Micro
             Revision baseRev = commit.getBaseRevision();
             isBranch = baseRev != null && baseRev.isBranch();
             parseJsonDiff(commit, jsonDiff, rootPath);
-            rev = nodeStore.apply(commit);
+            rev = commit.apply();
             success = true;
         } finally {
             if (!success) {
@@ -710,6 +716,7 @@ public class DocumentMK implements Micro
                         build();
             }
             return CacheBuilder.newBuilder().
+                    concurrencyLevel(CACHE_CONCURRENCY).
                     weigher(weigher).
                     maximumWeight(maxWeight).
                     recordStats().

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Thu Feb 20 20:38:13 2014
@@ -19,8 +19,11 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.MANY_CHILDREN_THRESHOLD;
+import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -118,6 +121,11 @@ public final class DocumentNodeStore
     protected final CommitQueue commitQueue;
 
     /**
+     * Commit queue for batch updates.
+     */
+    protected final BatchCommitQueue batchCommitQueue;
+
+    /**
      * The change dispatcher for this node store.
      */
     protected final ChangeDispatcher dispatcher;
@@ -336,6 +344,7 @@ public final class DocumentNodeStore
         getRevisionComparator().add(headRevision, Revision.newRevision(0));
         dispatcher = new ChangeDispatcher(getRoot());
         commitQueue = new CommitQueue(this, dispatcher);
+        batchCommitQueue = new BatchCommitQueue(store, revisionComparator);
         backgroundThread = new Thread(
                 new BackgroundOperation(this, isDisposed),
                 "DocumentNodeStore background thread");
@@ -866,6 +875,59 @@ public final class DocumentNodeStore
     }
 
     /**
+     * Updates a commit root document.
+     *
+     * @param commit the updates to apply on the commit root document.
+     * @return the document before the update was applied or <code>null</code>
+     *          if the update failed because of a collision.
+     * @throws MicroKernelException if the update fails with an error.
+     */
+    @CheckForNull
+    NodeDocument updateCommitRoot(UpdateOp commit) throws MicroKernelException {
+        // use batch commit when there are only revision and modified updates
+        // and collision checks
+        boolean batch = true;
+        for (Map.Entry<Key, Operation> op : commit.getChanges().entrySet()) {
+            String name = op.getKey().getName();
+            if (NodeDocument.isRevisionsEntry(name)
+                    || NodeDocument.MODIFIED.equals(name)
+                    || NodeDocument.COLLISIONS.equals(name)) {
+                continue;
+            }
+            batch = false;
+            break;
+        }
+        if (batch) {
+            return batchUpdateCommitRoot(commit);
+        } else {
+            return store.findAndUpdate(NODES, commit);
+        }
+    }
+
+    private NodeDocument batchUpdateCommitRoot(UpdateOp commit)
+            throws MicroKernelException {
+        try {
+            return batchCommitQueue.updateDocument(commit).call();
+        } catch (InterruptedException e) {
+            throw new MicroKernelException("Interrupted while updating commit root document");
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof MicroKernelException) {
+                throw (MicroKernelException) e.getCause();
+            } else {
+                String msg = "Update of commit root document failed";
+                throw new MicroKernelException(msg, e.getCause());
+            }
+        } catch (Exception e) {
+            if (e instanceof MicroKernelException) {
+                throw (MicroKernelException) e;
+            } else {
+                String msg = "Update of commit root document failed";
+                throw new MicroKernelException(msg, e);
+            }
+        }
+    }
+
+    /**
      * Returns the root node state at the given revision.
      *
      * @param revision a revision.
@@ -1021,49 +1083,6 @@ public final class DocumentNodeStore
     }
 
     /**
-     * Applies a commit to the store and updates the caches accordingly.
-     *
-     * @param commit the commit to apply.
-     * @return the commit revision.
-     * @throws MicroKernelException if the commit cannot be applied.
-     *              TODO: use non-MK exception type
-     */
-    @Nonnull
-    Revision apply(@Nonnull Commit commit) throws MicroKernelException {
-        checkNotNull(commit);
-        boolean success = false;
-        Revision baseRev = commit.getBaseRevision();
-        boolean isBranch = baseRev != null && baseRev.isBranch();
-        Revision rev = commit.getRevision();
-        if (isBranch) {
-            rev = rev.asBranchRevision();
-            // remember branch commit
-            Branch b = getBranches().getBranch(baseRev);
-            if (b == null) {
-                // baseRev is marker for new branch
-                b = getBranches().create(baseRev.asTrunkRevision(), rev);
-            } else {
-                b.addCommit(rev);
-            }
-            try {
-                // prepare commit
-                commit.prepare(baseRev);
-                success = true;
-            } finally {
-                if (!success) {
-                    b.removeCommit(rev);
-                    if (!b.hasCommits()) {
-                        getBranches().remove(b);
-                    }
-                }
-            }
-        } else {
-            commit.apply();
-        }
-        return rev;
-    }
-
-    /**
      * Compares the given {@code node} against the {@code base} state and
      * reports the differences on the children as a json diff string. This
      * method does not report any property changes between the two nodes.

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java Thu Feb 20 20:38:13 2014
@@ -169,7 +169,7 @@ public class DocumentNodeStoreBranch
                 // finally clause cancel the commit
                 return base;
             }
-            rev = store.apply(c);
+            rev = c.apply();
             success = true;
         } finally {
             if (success) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Thu Feb 20 20:38:13 2014
@@ -848,6 +848,10 @@ public class NodeDocument extends Docume
         checkNotNull(op).unsetMapEntry(REVISIONS, checkNotNull(revision));
     }
 
+    public static boolean isRevisionsEntry(String name) {
+        return REVISIONS.equals(name);
+    }
+
     public static void removeRevision(@Nonnull UpdateOp op,
                                       @Nonnull Revision revision) {
         checkNotNull(op).removeMapEntry(REVISIONS, checkNotNull(revision));

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PathRev.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PathRev.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PathRev.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PathRev.java Thu Feb 20 20:38:13 2014
@@ -64,4 +64,9 @@ final public class PathRev implements Ca
         }
         return false;
     }
+
+    @Override
+    public String toString() {
+        return path + "@" + revision;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java Thu Feb 20 20:38:13 2014
@@ -24,6 +24,8 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Maps;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -57,6 +59,14 @@ public final class UpdateOp {
         this.changes = changes;
     }
 
+    static UpdateOp combine(String id, Iterable<UpdateOp> ops) {
+        Map<Key, Operation> changes = Maps.newHashMap();
+        for (UpdateOp op : ops) {
+            changes.putAll(op.getChanges());
+        }
+        return new UpdateOp(id, false, false, changes);
+    }
+
     /**
      * Creates an update operation for the document with the given id. The
      * changes are shared with the this update operation.

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ConcurrentCreateNodesTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ConcurrentCreateNodesTest.java?rev=1570339&r1=1570338&r2=1570339&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ConcurrentCreateNodesTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ConcurrentCreateNodesTest.java Thu Feb 20 20:38:13 2014
@@ -40,7 +40,7 @@ public class ConcurrentCreateNodesTest e
     public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | NODE_MOVED |
             PROPERTY_ADDED | PROPERTY_REMOVED | PROPERTY_CHANGED | PERSIST;
     protected static final String ROOT_NODE_NAME = "test" + TEST_ID;
-    private static final int WORKER_COUNT = 20;
+    private static final int WORKER_COUNT = Integer.getInteger("workerCount", 20);
     private static final int LISTENER_COUNT = Integer.getInteger("listenerCount", 0);
     private static final int NODE_COUNT_LEVEL2 = 50;
     private static final String NODE_TYPE = System.getProperty("nodeType", "nt:unstructured");