You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/03 02:03:01 UTC

svn commit: r1547253 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentNodeStore.java SegmentNodeStoreBranch.java

Author: jukka
Date: Tue Dec  3 01:03:01 2013
New Revision: 1547253

URL: http://svn.apache.org/r1547253
Log:
OAK-593: Segment-based MK

Inline SegmentNodeStoreBranch into SegmentNodeStore in order to further simplify the code

Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1547253&r1=1547252&r2=1547253&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue Dec  3 01:03:01 2013
@@ -19,11 +19,14 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.Semaphore;
 
 import javax.annotation.CheckForNull;
@@ -94,25 +97,6 @@ public class SegmentNodeStore implements
         }
     }
 
-    boolean setHead(
-            SegmentNodeState base, SegmentNodeState head, CommitInfo info)
-            throws InterruptedException {
-        commitSemaphore.acquire();
-        try {
-            refreshHead();
-            if (journal.setHead(base.getRecordId(), head.getRecordId())) {
-                this.head = head;
-                changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
-                refreshHead();
-                return true;
-            } else {
-                return false;
-            }
-        } finally {
-            commitSemaphore.release();
-        }
-    }
-
     @Override
     public Closeable addObserver(Observer observer) {
         return changeDispatcher.addObserver(observer);
@@ -140,9 +124,8 @@ public class SegmentNodeStore implements
         checkArgument(store.isInstance(base, SegmentRootState.class));
         SegmentNodeState root = ((SegmentRootState) base).getRootState();
 
-        SegmentNodeStoreBranch branch = new SegmentNodeStoreBranch(
-                this, store.getWriter(), root, maximumBackoff);
-        branch.setRoot(builder.getNodeState());
+        SegmentNodeStoreBranch branch =
+                new SegmentNodeStoreBranch(root, builder.getNodeState());
         NodeState merged = branch.merge(commitHook, info);
         ((SegmentNodeBuilder) builder).reset(merged);
         return merged;
@@ -172,7 +155,6 @@ public class SegmentNodeStore implements
                         ((Record) b).getRecordId());
     }
 
-
     @Override @Nonnull
     public NodeState reset(@Nonnull NodeBuilder builder) {
         checkArgument(builder instanceof SegmentRootBuilder);
@@ -201,4 +183,172 @@ public class SegmentNodeStore implements
                 new SegmentNodeState(store.getWriter().getDummySegment(), id);
         return root.getChildNode(ROOT);
     }
+
+    private static final Random RANDOM = new Random();
+
+    private class SegmentNodeStoreBranch {
+
+        private SegmentNodeState base;
+
+        private SegmentNodeState head;
+
+        SegmentNodeStoreBranch(SegmentNodeState base, NodeState head) {
+            this.base = base;
+            SegmentRootBuilder builder = base.builder();
+            builder.setChildNode(ROOT, head);
+            this.head = builder.getNodeState();
+        }
+
+        private boolean setHead(
+                SegmentNodeState base, SegmentNodeState head, CommitInfo info)
+                throws InterruptedException {
+            commitSemaphore.acquire();
+            try {
+                refreshHead();
+                if (journal.setHead(base.getRecordId(), head.getRecordId())) {
+                    this.head = head;
+                    changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
+                    refreshHead();
+                    return true;
+                } else {
+                    return false;
+                }
+            } finally {
+                commitSemaphore.release();
+            }
+        }
+
+        private void rebase() {
+            SegmentNodeState newBase = SegmentNodeStore.this.head;
+            if (!base.getRecordId().equals(newBase.getRecordId())) {
+                NodeBuilder builder = newBase.builder();
+                head.getChildNode(ROOT).compareAgainstBaseState(
+                        base.getChildNode(ROOT),
+                        new ConflictAnnotatingRebaseDiff(builder.child(ROOT)));
+                base = newBase;
+                head = store.getWriter().writeNode(builder.getNodeState());
+            }
+        }
+
+        private long optimisticMerge(CommitHook hook, CommitInfo info)
+                throws CommitFailedException, InterruptedException {
+            long timeout = 1;
+
+            SegmentNodeState originalBase = base;
+            SegmentNodeState originalHead = head;
+
+            // use exponential backoff in case of concurrent commits
+            for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
+                rebase(); // rebase to latest head, a no-op if already there
+
+                long start = System.nanoTime();
+
+                if (base.hasProperty("token")
+                        && base.getLong("timeout") >= System.currentTimeMillis()) {
+                    // someone else has a pessimistic lock on the journal,
+                    // so we should not try to commit anything
+                } else {
+                    // apply commit hooks on the rebased changes
+                    NodeBuilder builder = head.builder();
+                    builder.setChildNode(ROOT, hook.processCommit(
+                            base.getChildNode(ROOT), head.getChildNode(ROOT)));
+                    SegmentNodeState newHead =
+                            store.getWriter().writeNode(builder.getNodeState());
+
+                    // use optimistic locking to update the journal
+                    if (setHead(base, newHead, info)) {
+                        base = newHead;
+                        head = newHead;
+                        return -1;
+                    }
+                }
+
+                // someone else was faster, so restore state and retry later
+                base = originalBase;
+                head = originalHead;
+
+                RANDOM.wait(backoff, RANDOM.nextInt(1000000));
+
+                long stop = System.nanoTime();
+                if (stop - start > timeout) {
+                    timeout = stop - start;
+                }
+            }
+
+            return MILLISECONDS.convert(timeout, NANOSECONDS);
+        }
+
+        private void pessimisticMerge(
+                CommitHook hook, long timeout, CommitInfo info)
+                throws CommitFailedException, InterruptedException {
+            while (true) {
+                SegmentNodeState before = head;
+                long now = System.currentTimeMillis();
+                if (before.hasProperty("token")
+                        && before.getLong("timeout") >= now) {
+                    // locked by someone else, wait until unlocked or expired
+                    RANDOM.wait(
+                            Math.min(before.getLong("timeout") - now, 1000),
+                            RANDOM.nextInt(1000000));
+                } else {
+                    // attempt to acquire the lock
+                    NodeBuilder builder = before.builder();
+                    builder.setProperty("token", UUID.randomUUID().toString());
+                    builder.setProperty("timeout", now + timeout);
+
+                    SegmentNodeState after =
+                            store.getWriter().writeNode(builder.getNodeState());
+                    if (setHead(before, after, info)) {
+                        SegmentNodeState originalBase = base;
+                        SegmentNodeState originalHead = head;
+
+                        // lock acquired; rebase, apply commit hooks, and unlock
+                        rebase();
+                        builder.setChildNode(ROOT, hook.processCommit(
+                                base.getChildNode(ROOT), head.getChildNode(ROOT)));
+                        builder.removeProperty("token");
+                        builder.removeProperty("timeout");
+
+                        // complete the commit
+                        SegmentNodeState newHead =
+                                store.getWriter().writeNode(builder.getNodeState());
+                        if (setHead(after, newHead, info)) {
+                            base = newHead;
+                            head = newHead;
+                            return;
+                        } else {
+                            // something else happened, perhaps a timeout, so
+                            // undo the previous rebase and try again
+                            base = originalBase;
+                            head = originalHead;
+                        }
+                    }
+                }
+            }
+        }
+
+        @Nonnull
+        SegmentRootState merge(@Nonnull CommitHook hook, @Nullable CommitInfo info)
+                throws CommitFailedException {
+            checkNotNull(hook);
+            if (base != head) {
+                synchronized (RANDOM) {
+                    try {
+                        long timeout = optimisticMerge(hook, info);
+                        if (timeout >= 0) {
+                            pessimisticMerge(hook, timeout, info);
+                        }
+                    } catch (InterruptedException e) {
+                        throw new CommitFailedException(
+                                "Segment", 1, "Commit interrupted", e);
+                    } finally {
+                        RANDOM.notifyAll();
+                    }
+                }
+            }
+            return new SegmentRootState(head);
+        }
+
+    }
+
 }