You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/03 02:03:01 UTC
svn commit: r1547253 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment:
SegmentNodeStore.java SegmentNodeStoreBranch.java
Author: jukka
Date: Tue Dec 3 01:03:01 2013
New Revision: 1547253
URL: http://svn.apache.org/r1547253
Log:
OAK-593: Segment-based MK
Inline SegmentNodeStoreBranch into SegmentNodeStore in order to further simplify the code
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1547253&r1=1547252&r2=1547253&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue Dec 3 01:03:01 2013
@@ -19,11 +19,14 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.Semaphore;
import javax.annotation.CheckForNull;
@@ -94,25 +97,6 @@ public class SegmentNodeStore implements
}
}
- boolean setHead(
- SegmentNodeState base, SegmentNodeState head, CommitInfo info)
- throws InterruptedException {
- commitSemaphore.acquire();
- try {
- refreshHead();
- if (journal.setHead(base.getRecordId(), head.getRecordId())) {
- this.head = head;
- changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
- refreshHead();
- return true;
- } else {
- return false;
- }
- } finally {
- commitSemaphore.release();
- }
- }
-
@Override
public Closeable addObserver(Observer observer) {
return changeDispatcher.addObserver(observer);
@@ -140,9 +124,8 @@ public class SegmentNodeStore implements
checkArgument(store.isInstance(base, SegmentRootState.class));
SegmentNodeState root = ((SegmentRootState) base).getRootState();
- SegmentNodeStoreBranch branch = new SegmentNodeStoreBranch(
- this, store.getWriter(), root, maximumBackoff);
- branch.setRoot(builder.getNodeState());
+ SegmentNodeStoreBranch branch =
+ new SegmentNodeStoreBranch(root, builder.getNodeState());
NodeState merged = branch.merge(commitHook, info);
((SegmentNodeBuilder) builder).reset(merged);
return merged;
@@ -172,7 +155,6 @@ public class SegmentNodeStore implements
((Record) b).getRecordId());
}
-
@Override @Nonnull
public NodeState reset(@Nonnull NodeBuilder builder) {
checkArgument(builder instanceof SegmentRootBuilder);
@@ -201,4 +183,172 @@ public class SegmentNodeStore implements
new SegmentNodeState(store.getWriter().getDummySegment(), id);
return root.getChildNode(ROOT);
}
+
+ private static final Random RANDOM = new Random();
+
+ private class SegmentNodeStoreBranch {
+
+ private SegmentNodeState base;
+
+ private SegmentNodeState head;
+
+ SegmentNodeStoreBranch(SegmentNodeState base, NodeState head) {
+ this.base = base;
+ SegmentRootBuilder builder = base.builder();
+ builder.setChildNode(ROOT, head);
+ this.head = builder.getNodeState();
+ }
+
+ private boolean setHead(
+ SegmentNodeState base, SegmentNodeState head, CommitInfo info)
+ throws InterruptedException {
+ commitSemaphore.acquire();
+ try {
+ refreshHead();
+ if (journal.setHead(base.getRecordId(), head.getRecordId())) {
+ this.head = head;
+ changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
+ refreshHead();
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ commitSemaphore.release();
+ }
+ }
+
+ private void rebase() {
+ SegmentNodeState newBase = SegmentNodeStore.this.head;
+ if (!base.getRecordId().equals(newBase.getRecordId())) {
+ NodeBuilder builder = newBase.builder();
+ head.getChildNode(ROOT).compareAgainstBaseState(
+ base.getChildNode(ROOT),
+ new ConflictAnnotatingRebaseDiff(builder.child(ROOT)));
+ base = newBase;
+ head = store.getWriter().writeNode(builder.getNodeState());
+ }
+ }
+
+ private long optimisticMerge(CommitHook hook, CommitInfo info)
+ throws CommitFailedException, InterruptedException {
+ long timeout = 1;
+
+ SegmentNodeState originalBase = base;
+ SegmentNodeState originalHead = head;
+
+ // use exponential backoff in case of concurrent commits
+ for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
+ rebase(); // rebase to latest head, a no-op if already there
+
+ long start = System.nanoTime();
+
+ if (base.hasProperty("token")
+ && base.getLong("timeout") >= System.currentTimeMillis()) {
+ // someone else has a pessimistic lock on the journal,
+ // so we should not try to commit anything
+ } else {
+ // apply commit hooks on the rebased changes
+ NodeBuilder builder = head.builder();
+ builder.setChildNode(ROOT, hook.processCommit(
+ base.getChildNode(ROOT), head.getChildNode(ROOT)));
+ SegmentNodeState newHead =
+ store.getWriter().writeNode(builder.getNodeState());
+
+ // use optimistic locking to update the journal
+ if (setHead(base, newHead, info)) {
+ base = newHead;
+ head = newHead;
+ return -1;
+ }
+ }
+
+ // someone else was faster, so restore state and retry later
+ base = originalBase;
+ head = originalHead;
+
+ RANDOM.wait(backoff, RANDOM.nextInt(1000000));
+
+ long stop = System.nanoTime();
+ if (stop - start > timeout) {
+ timeout = stop - start;
+ }
+ }
+
+ return MILLISECONDS.convert(timeout, NANOSECONDS);
+ }
+
+ private void pessimisticMerge(
+ CommitHook hook, long timeout, CommitInfo info)
+ throws CommitFailedException, InterruptedException {
+ while (true) {
+ SegmentNodeState before = head;
+ long now = System.currentTimeMillis();
+ if (before.hasProperty("token")
+ && before.getLong("timeout") >= now) {
+ // locked by someone else, wait until unlocked or expired
+ RANDOM.wait(
+ Math.min(before.getLong("timeout") - now, 1000),
+ RANDOM.nextInt(1000000));
+ } else {
+ // attempt to acquire the lock
+ NodeBuilder builder = before.builder();
+ builder.setProperty("token", UUID.randomUUID().toString());
+ builder.setProperty("timeout", now + timeout);
+
+ SegmentNodeState after =
+ store.getWriter().writeNode(builder.getNodeState());
+ if (setHead(before, after, info)) {
+ SegmentNodeState originalBase = base;
+ SegmentNodeState originalHead = head;
+
+ // lock acquired; rebase, apply commit hooks, and unlock
+ rebase();
+ builder.setChildNode(ROOT, hook.processCommit(
+ base.getChildNode(ROOT), head.getChildNode(ROOT)));
+ builder.removeProperty("token");
+ builder.removeProperty("timeout");
+
+ // complete the commit
+ SegmentNodeState newHead =
+ store.getWriter().writeNode(builder.getNodeState());
+ if (setHead(after, newHead, info)) {
+ base = newHead;
+ head = newHead;
+ return;
+ } else {
+ // something else happened, perhaps a timeout, so
+ // undo the previous rebase and try again
+ base = originalBase;
+ head = originalHead;
+ }
+ }
+ }
+ }
+ }
+
+ @Nonnull
+ SegmentRootState merge(@Nonnull CommitHook hook, @Nullable CommitInfo info)
+ throws CommitFailedException {
+ checkNotNull(hook);
+ if (base != head) {
+ synchronized (RANDOM) {
+ try {
+ long timeout = optimisticMerge(hook, info);
+ if (timeout >= 0) {
+ pessimisticMerge(hook, timeout, info);
+ }
+ } catch (InterruptedException e) {
+ throw new CommitFailedException(
+ "Segment", 1, "Commit interrupted", e);
+ } finally {
+ RANDOM.notifyAll();
+ }
+ }
+ }
+ return new SegmentRootState(head);
+ }
+
+ }
+
}