You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/02 16:17:34 UTC

svn commit: r1547054 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentNodeStore.java SegmentNodeStoreBranch.java

Author: jukka
Date: Mon Dec  2 15:17:33 2013
New Revision: 1547054

URL: http://svn.apache.org/r1547054
Log:
OAK-1249: Fine-grained locking in SegmentMK commits

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1547054&r1=1547053&r2=1547054&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Mon Dec  2 15:17:33 2013
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
@@ -53,7 +54,9 @@ public class SegmentNodeStore implements
 
     private final ChangeDispatcher changeDispatcher;
 
-    private SegmentNodeState head;
+    volatile SegmentNodeState head;
+
+    private boolean inLocalCommit = false;
 
     private long maximumBackoff = MILLISECONDS.convert(10, SECONDS);
 
@@ -73,23 +76,58 @@ public class SegmentNodeStore implements
         this.maximumBackoff = max;
     }
 
-    synchronized SegmentNodeState getHead() {
-        head = new SegmentNodeState(
-                store.getWriter().getDummySegment(), journal.getHead());
-        return head;
+    /**
+     * Refreshes the head state. Does nothing if a concurrent local commit is
+     * in progress, as that commit will automatically refresh the head state.
+     *
+     * @param commit whether this refresh is a part of a local commit
+     */
+    private synchronized void refreshHead(boolean commit) {
+        if (commit || !inLocalCommit) {
+            RecordId id = journal.getHead();
+            if (!id.equals(head.getRecordId())) {
+                head = new SegmentNodeState(
+                        store.getWriter().getDummySegment(), id);
+                changeDispatcher.contentChanged(head.getChildNode(ROOT), null);
+            }
+        }
+    }
+
+    private synchronized void refreshHeadInCommit(boolean start)
+            throws InterruptedException {
+        if (start) {
+            while (inLocalCommit) {
+                wait();
+            }
+            inLocalCommit = true;
+        } else {
+            checkState(inLocalCommit);
+        }
+
+        try {
+            refreshHead(true);
+        } finally {
+            if (!start) {
+                inLocalCommit = false;
+                notifyAll();
+            }
+        }
     }
 
-    boolean setHead(SegmentNodeState base, SegmentNodeState head, CommitInfo info) {
-        changeDispatcher.contentChanged(base.getChildNode(ROOT), null);
+    boolean setHead(
+            SegmentNodeState base, SegmentNodeState head, CommitInfo info)
+            throws InterruptedException {
+        refreshHeadInCommit(true);
         try {
             if (journal.setHead(base.getRecordId(), head.getRecordId())) {
+                this.head = head;
                 changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
                 return true;
             } else {
                 return false;
             }
         } finally {
-            changeDispatcher.contentChanged(getRoot(), null);
+            refreshHeadInCommit(false);
         }
     }
 
@@ -99,22 +137,21 @@ public class SegmentNodeStore implements
     }
 
     @Override @Nonnull
-    public synchronized NodeState getRoot() {
-        return getHead().getChildNode(ROOT);
+    public NodeState getRoot() {
+        refreshHead(false);
+        return head.getChildNode(ROOT);
     }
 
     @Override
-    public synchronized NodeState merge(
-            @Nonnull NodeBuilder builder,
-            @Nonnull CommitHook commitHook,
-            @Nullable CommitInfo info)
-            throws CommitFailedException {
+    public NodeState merge(
+            @Nonnull NodeBuilder builder, @Nonnull CommitHook commitHook,
+            @Nullable CommitInfo info) throws CommitFailedException {
         checkArgument(builder instanceof SegmentNodeBuilder);
         checkNotNull(commitHook);
-        SegmentNodeState head = getHead();
-        rebase(builder, head.getChildNode(ROOT)); // TODO: can we avoid this?
+        SegmentNodeState base = head;
+        rebase(builder, base.getChildNode(ROOT)); // TODO: can we avoid this?
         SegmentNodeStoreBranch branch = new SegmentNodeStoreBranch(
-                this, store.getWriter(), head, maximumBackoff);
+                this, store.getWriter(), base, maximumBackoff);
         branch.setRoot(builder.getNodeState());
         NodeState merged = branch.merge(commitHook, info);
         ((SegmentNodeBuilder) builder).reset(merged);
@@ -163,7 +200,7 @@ public class SegmentNodeStore implements
     public synchronized String checkpoint(long lifetime) {
         checkArgument(lifetime > 0);
         // TODO: Guard the checkpoint from garbage collection
-        return getHead().getRecordId().toString();
+        return head.getRecordId().toString();
     }
 
     @Override @CheckForNull

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1547054&r1=1547053&r2=1547054&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Mon Dec  2 15:17:33 2013
@@ -79,7 +79,7 @@ class SegmentNodeStoreBranch implements 
 
     @Override
     public synchronized void rebase() {
-        SegmentNodeState newBase = store.getHead();
+        SegmentNodeState newBase = store.head;
         if (!base.getRecordId().equals(newBase.getRecordId())) {
             NodeBuilder builder = newBase.builder();
             head.getChildNode(ROOT).compareAgainstBaseState(
@@ -137,9 +137,9 @@ class SegmentNodeStoreBranch implements 
     }
 
     private synchronized void pessimisticMerge(CommitHook hook, long timeout, CommitInfo info)
-            throws CommitFailedException {
+            throws CommitFailedException, InterruptedException {
         while (true) {
-            SegmentNodeState before = store.getHead();
+            SegmentNodeState before = store.head;
             long now = System.currentTimeMillis();
             if (before.hasProperty("token")
                     && before.getLong("timeout") >= now) {