You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/07/16 10:43:36 UTC
svn commit: r1503616 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/
test/java/org/apache/jackrabbit/oak/plugins/segment/
Author: jukka
Date: Tue Jul 16 08:43:35 2013
New Revision: 1503616
URL: http://svn.apache.org/r1503616
Log:
OAK-786: Fall back to pessimism
Enable the pessimistic locking fallback. Add a test case.
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1503616&r1=1503615&r2=1503616&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue Jul 16 08:43:35 2013
@@ -76,7 +76,7 @@ public class SegmentNodeStore extends Ab
}
@Override @Nonnull
- public NodeStoreBranch branch() {
+ public SegmentNodeStoreBranch branch() {
return new SegmentNodeStoreBranch(
this, new SegmentWriter(store), getHead());
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1503616&r1=1503615&r2=1503616&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Tue Jul 16 08:43:35 2013
@@ -16,6 +16,9 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore.ROOT;
import java.util.Random;
@@ -43,6 +46,8 @@ class SegmentNodeStoreBranch extends Abs
private SegmentNodeState head;
+ private long maximumBackoff = MILLISECONDS.convert(10, SECONDS);
+
SegmentNodeStoreBranch(
SegmentNodeStore store, SegmentWriter writer,
SegmentNodeState base) {
@@ -52,6 +57,10 @@ class SegmentNodeStoreBranch extends Abs
this.head = base;
}
+ void setMaximumBackoff(long max) {
+ this.maximumBackoff = max;
+ }
+
@Override @Nonnull
public NodeState getBase() {
return base.getChildNode(ROOT);
@@ -84,8 +93,55 @@ class SegmentNodeStoreBranch extends Abs
}
}
- private synchronized NodeState pessimisticMerge(
- CommitHook hook, long timeout) throws CommitFailedException {
+ private synchronized long optimisticMerge(CommitHook hook)
+ throws CommitFailedException, InterruptedException {
+ long timeout = 1;
+
+ SegmentNodeState originalBase = base;
+ SegmentNodeState originalHead = head;
+
+ // use exponential backoff in case of concurrent commits
+ for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
+ long start = System.nanoTime();
+
+ // apply commit hooks on the rebased changes
+ NodeBuilder builder = head.builder();
+ builder.setChildNode(ROOT, hook.processCommit(
+ base.getChildNode(ROOT), head.getChildNode(ROOT)));
+ SegmentNodeState newHead = writer.writeNode(builder.getNodeState());
+ writer.flush();
+
+ // use optimistic locking to update the journal
+ if (base.hasProperty("token")
+ && base.getLong("timeout") >= System.currentTimeMillis()) {
+ // someone else has a pessimistic lock on the journal,
+ // so we should not try to commit anything
+ } else if (store.setHead(base, newHead)) {
+ base = newHead;
+ head = newHead;
+ return -1;
+ }
+
+ // someone else was faster, so restore state and retry later
+ base = originalBase;
+ head = originalHead;
+
+ Thread.sleep(backoff, RANDOM.nextInt(1000000));
+
+ // rebase to latest head before trying again
+ rebase();
+
+ long stop = System.nanoTime();
+ if (stop - start > timeout) {
+ timeout = stop - start;
+ }
+ }
+
+ return MILLISECONDS.convert(timeout, NANOSECONDS);
+ }
+
+ private synchronized void pessimisticMerge(CommitHook hook, long timeout)
+ throws CommitFailedException {
while (true) {
SegmentNodeState before = store.getHead();
long now = System.currentTimeMillis();
@@ -120,7 +176,7 @@ class SegmentNodeStoreBranch extends Abs
if (store.setHead(after, newHead)) {
base = newHead;
head = newHead;
- return getHead();
+ return;
} else {
// something else happened, perhaps a timeout, so
// undo the previous rebase and try again
@@ -135,46 +191,15 @@ class SegmentNodeStoreBranch extends Abs
@Override @Nonnull
public synchronized NodeState merge(CommitHook hook)
throws CommitFailedException {
- int backoff = 1;
- SegmentNodeState originalBase = base;
- SegmentNodeState originalHead = head;
- while (base != head) {
- // apply commit hooks on the rebased changes
- NodeBuilder builder = head.builder();
- builder.removeProperty("token");
- builder.removeProperty("timeout");
- builder.setChildNode(ROOT, hook.processCommit(
- base.getChildNode(ROOT), head.getChildNode(ROOT)));
- SegmentNodeState newHead = writer.writeNode(builder.getNodeState());
- writer.flush();
-
- // use optimistic locking to update the journal
- if (store.setHead(base, newHead)) {
- base = newHead;
- head = newHead;
- } else {
- // someone else was faster, so restore state and retry later
- base = originalBase;
- head = originalHead;
-
- // use exponential backoff to reduce contention
- if (backoff < 10000) {
- try {
- Thread.sleep(backoff, RANDOM.nextInt(1000000));
- backoff *= 2;
- } catch (InterruptedException e) {
- // TODO: correct interrupt handling?
- throw new CommitFailedException(
- "Segment", 1, "Commit was interrupted", e);
- }
- } else {
- throw new CommitFailedException(
- "Segment", 2,
- "System overloaded, try again later");
+ if (base != head) {
+ try {
+ long timeout = optimisticMerge(hook);
+ if (timeout >= 0) {
+ pessimisticMerge(hook, timeout);
}
-
- // rebase to latest head before trying again
- rebase();
+ } catch (InterruptedException e) {
+ throw new CommitFailedException(
+ "Segment", 1, "Commit interrupted", e);
}
}
return getHead();
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java?rev=1503616&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java Tue Jul 16 08:43:35 2013
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.junit.Test;
+
+public class MergeTest {
+
+ @Test
+ public void testSequentialMerge() throws CommitFailedException {
+ NodeStore store = new SegmentNodeStore(new MemoryStore());
+
+ assertFalse(store.getRoot().hasProperty("foo"));
+ assertFalse(store.getRoot().hasProperty("bar"));
+
+ NodeStoreBranch a = store.branch();
+ a.setRoot(a.getHead().builder().setProperty("foo", "abc").getNodeState());
+ a.merge(EmptyHook.INSTANCE);
+
+ assertTrue(store.getRoot().hasProperty("foo"));
+ assertFalse(store.getRoot().hasProperty("bar"));
+
+ NodeStoreBranch b = store.branch();
+ b.setRoot(b.getHead().builder().setProperty("bar", "xyz").getNodeState());
+ b.merge(EmptyHook.INSTANCE);
+
+ assertTrue(store.getRoot().hasProperty("foo"));
+ assertTrue(store.getRoot().hasProperty("bar"));
+ }
+
+ @Test
+ public void testOptimisticMerge() throws CommitFailedException {
+ NodeStore store = new SegmentNodeStore(new MemoryStore());
+
+ NodeStoreBranch a = store.branch();
+ a.setRoot(a.getHead().builder().setProperty("foo", "abc").getNodeState());
+
+ NodeStoreBranch b = store.branch();
+ b.setRoot(b.getHead().builder().setProperty("bar", "xyz").getNodeState());
+
+ assertFalse(store.getRoot().hasProperty("foo"));
+ assertFalse(store.getRoot().hasProperty("bar"));
+
+ a.merge(EmptyHook.INSTANCE);
+
+ assertTrue(store.getRoot().hasProperty("foo"));
+ assertFalse(store.getRoot().hasProperty("bar"));
+
+ b.merge(EmptyHook.INSTANCE);
+
+ assertTrue(store.getRoot().hasProperty("foo"));
+ assertTrue(store.getRoot().hasProperty("bar"));
+ }
+
+ @Test
+ public void testPessimisticMerge() throws Exception {
+ final SegmentNodeStore store = new SegmentNodeStore(new MemoryStore());
+ final Semaphore semaphore = new Semaphore(0);
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ Thread background = new Thread() {
+ @Override
+ public void run() {
+ for (int i = 0; running.get(); i++) {
+ try {
+ SegmentNodeStoreBranch a = store.branch();
+ NodeBuilder builder = a.getHead().builder();
+ builder.setProperty("foo", "abc" + i);
+ a.setRoot(builder.getNodeState());
+ a.merge(EmptyHook.INSTANCE);
+ semaphore.release();
+ } catch (CommitFailedException e) {
+ fail();
+ }
+ }
+ }
+ };
+ background.start();
+
+ // wait for the first commit
+ semaphore.acquire();
+
+ assertTrue(store.getRoot().hasProperty("foo"));
+ assertFalse(store.getRoot().hasProperty("bar"));
+
+ SegmentNodeStoreBranch b = store.branch();
+ b.setMaximumBackoff(100);
+ b.setRoot(b.getHead().builder().setProperty("bar", "xyz").getNodeState());
+ b.merge(new CommitHook() {
+ @Override @Nonnull
+ public NodeState processCommit(NodeState before, NodeState after) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ fail();
+ }
+ return after;
+ }
+ });
+
+ assertTrue(store.getRoot().hasProperty("foo"));
+ assertTrue(store.getRoot().hasProperty("bar"));
+
+ running.set(false);
+ background.join();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java
------------------------------------------------------------------------------
svn:eol-style = native