You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/07/23 15:08:51 UTC

svn commit: r1506027 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: kernel/KernelNodeStore.java kernel/KernelNodeStoreBranch.java plugins/segment/SegmentNodeStoreBranch.java

Author: mduerig
Date: Tue Jul 23 13:08:51 2013
New Revision: 1506027

URL: http://svn.apache.org/r1506027
Log:
OAK-927 Concurrent commits may cause duplicate observation events
- call commit hook with correct base revision
- use explicit lock object from node store to synchronise concurrent merge calls
credits to Chetan for spotting these issues

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java?rev=1506027&r1=1506026&r2=1506027&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java Tue Jul 23 13:08:51 2013
@@ -16,9 +16,14 @@
  */
 package org.apache.jackrabbit.oak.kernel;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -29,18 +34,14 @@ import com.google.common.cache.LoadingCa
 import com.google.common.cache.Weigher;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.AbstractNodeStore;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
-import org.apache.jackrabbit.oak.cache.CacheStats;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * {@code NodeStore} implementations against {@link MicroKernel}.
@@ -65,6 +66,11 @@ public class KernelNodeStore extends Abs
     private final CacheStats cacheStats;
 
     /**
+     * Lock passed to branches for coordinating merges
+     */
+    private final Lock mergeLock = new ReentrantLock();
+
+    /**
      * State of the current root node.
      */
     private KernelNodeState root;
@@ -140,7 +146,7 @@ public class KernelNodeStore extends Abs
 
     @Override
     public NodeStoreBranch branch() {
-        return new KernelNodeStoreBranch(this, getRoot());
+        return new KernelNodeStoreBranch(this, getRoot(), mergeLock);
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1506027&r1=1506026&r2=1506027&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java Tue Jul 23 13:08:51 2013
@@ -23,6 +23,8 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
 
+import java.util.concurrent.locks.Lock;
+
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -43,6 +45,11 @@ class KernelNodeStoreBranch extends Abst
     /** The underlying store to which this branch belongs */
     private final KernelNodeStore store;
 
+    /**
+     * Lock for coordinating concurrent merge operations
+     */
+    private final Lock mergeLock;
+
     /** Root state of the base revision of this branch */
     private KernelNodeState base;
 
@@ -55,10 +62,11 @@ class KernelNodeStoreBranch extends Abst
     /** Number of updates to this branch via {@link #setRoot(NodeState)} */
     private int updates = 0;
 
-    KernelNodeStoreBranch(KernelNodeStore store, KernelNodeState root) {
+    KernelNodeStoreBranch(KernelNodeStore store, KernelNodeState root, Lock mergeLock) {
         this.store = store;
         this.base = root;
         this.head = root;
+        this.mergeLock = mergeLock;
     }
 
     @Override
@@ -138,46 +146,46 @@ class KernelNodeStoreBranch extends Abst
     @Override
     public NodeState merge(CommitHook hook, PostCommitHook committed) throws CommitFailedException {
         checkNotMerged();
-        synchronized (this) {
+        NodeState oldRoot = head;
+        mergeLock.lock();
+        try {
             rebase();
             NodeState toCommit = checkNotNull(hook).processCommit(base, head);
-            NodeState oldRoot = head;
             head = toCommit;
-
-            try {
-                if (head.equals(base)) {
-                    // Nothing was written to this branch: return base state
-                    head = null;  // Mark as merged
-                    committed.contentChanged(base, base);
-                    return base;
+            if (head.equals(base)) {
+                // Nothing was written to this branch: return base state
+                head = null;  // Mark as merged
+                committed.contentChanged(base, base);
+                return base;
+            } else {
+                NodeState newRoot;
+                JsopDiff diff = new JsopDiff(store);
+                if (headRevision == null) {
+                    // no branch created yet, commit directly
+                    head.compareAgainstBaseState(base, diff);
+                    newRoot = store.commit(diff.toString(), base.getRevision());
                 } else {
-                    NodeState newRoot;
-                    JsopDiff diff = new JsopDiff(store);
-                    if (headRevision == null) {
-                        // no branch created yet, commit directly
-                        head.compareAgainstBaseState(base, diff);
-                        newRoot = store.commit(diff.toString(), base.getRevision());
-                    } else {
-                        // commit into branch and merge
-                        head.compareAgainstBaseState(store.getRootState(headRevision), diff);
-                        String jsop = diff.toString();
-                        if (!jsop.isEmpty()) {
-                            headRevision = store.getKernel().commit(
-                                    "", jsop, headRevision, null);
-                        }
-                        newRoot = store.merge(headRevision);
-                        headRevision = null;
+                    // commit into branch and merge
+                    head.compareAgainstBaseState(store.getRootState(headRevision), diff);
+                    String jsop = diff.toString();
+                    if (!jsop.isEmpty()) {
+                        headRevision = store.getKernel().commit(
+                                "", jsop, headRevision, null);
                     }
-                    head = null;  // Mark as merged
-                    committed.contentChanged(base, newRoot);
-                    return newRoot;
+                    newRoot = store.merge(headRevision);
+                    headRevision = null;
                 }
-            } catch (MicroKernelException e) {
-                head = oldRoot;
-                throw new CommitFailedException(
-                        "Kernel", 1,
-                        "Failed to merge changes to the underlying MicroKernel", e);
+                head = null;  // Mark as merged
+                committed.contentChanged(base, newRoot);
+                return newRoot;
             }
+        } catch (MicroKernelException e) {
+            head = oldRoot;
+            throw new CommitFailedException(
+                    "Kernel", 1,
+                    "Failed to merge changes to the underlying MicroKernel", e);
+        } finally {
+            mergeLock.unlock();
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1506027&r1=1506026&r2=1506027&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Tue Jul 23 13:08:51 2013
@@ -118,9 +118,10 @@ class SegmentNodeStoreBranch extends Abs
                 // someone else has a pessimistic lock on the journal,
                 // so we should not try to commit anything
             } else if (store.setHead(base, newHead)) {
+                NodeState previousBase = base;
                 base = newHead;
                 head = newHead;
-                committed.contentChanged(originalBase.getChildNode(ROOT), newHead.getChildNode(ROOT));
+                committed.contentChanged(previousBase.getChildNode(ROOT), newHead.getChildNode(ROOT));
                 return -1;
             }
 
@@ -176,9 +177,10 @@ class SegmentNodeStoreBranch extends Abs
                             writer.writeNode(builder.getNodeState());
                     writer.flush();
                     if (store.setHead(after, newHead)) {
+                        NodeState previousBase = base;
                         base = newHead;
                         head = newHead;
-                        committed.contentChanged(originalBase.getChildNode(ROOT), newHead.getChildNode(ROOT));
+                        committed.contentChanged(previousBase.getChildNode(ROOT), newHead.getChildNode(ROOT));
                         return;
                     } else {
                         // something else happened, perhaps a timeout, so