You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/07/16 10:43:36 UTC

svn commit: r1503616 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: jukka
Date: Tue Jul 16 08:43:35 2013
New Revision: 1503616

URL: http://svn.apache.org/r1503616
Log:
OAK-786: Fall back to pessimism

Enable the pessimistic locking fallback. Add a test case.

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1503616&r1=1503615&r2=1503616&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue Jul 16 08:43:35 2013
@@ -76,7 +76,7 @@ public class SegmentNodeStore extends Ab
     }
 
     @Override @Nonnull
-    public NodeStoreBranch branch() {
+    public SegmentNodeStoreBranch branch() {
         return new SegmentNodeStoreBranch(
                 this, new SegmentWriter(store), getHead());
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1503616&r1=1503615&r2=1503616&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Tue Jul 16 08:43:35 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore.ROOT;
 
 import java.util.Random;
@@ -43,6 +46,8 @@ class SegmentNodeStoreBranch extends Abs
 
     private SegmentNodeState head;
 
+    private long maximumBackoff = MILLISECONDS.convert(10, SECONDS);
+
     SegmentNodeStoreBranch(
             SegmentNodeStore store, SegmentWriter writer,
             SegmentNodeState base) {
@@ -52,6 +57,10 @@ class SegmentNodeStoreBranch extends Abs
         this.head = base;
     }
 
+    void setMaximumBackoff(long max) {
+        this.maximumBackoff = max;
+    }
+
     @Override @Nonnull
     public NodeState getBase() {
         return base.getChildNode(ROOT);
@@ -84,8 +93,55 @@ class SegmentNodeStoreBranch extends Abs
         }
     }
 
-    private synchronized NodeState pessimisticMerge(
-            CommitHook hook, long timeout) throws CommitFailedException {
+    private synchronized long optimisticMerge(CommitHook hook)
+            throws CommitFailedException, InterruptedException {
+        long timeout = 1;
+
+        SegmentNodeState originalBase = base;
+        SegmentNodeState originalHead = head;
+
+        // use exponential backoff in case of concurrent commits
+        for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
+            long start = System.nanoTime();
+
+            // apply commit hooks on the rebased changes
+            NodeBuilder builder = head.builder();
+            builder.setChildNode(ROOT, hook.processCommit(
+                    base.getChildNode(ROOT), head.getChildNode(ROOT)));
+            SegmentNodeState newHead = writer.writeNode(builder.getNodeState());
+            writer.flush();
+
+            // use optimistic locking to update the journal
+            if (base.hasProperty("token")
+                    && base.getLong("timeout") >= System.currentTimeMillis()) {
+                // someone else has a pessimistic lock on the journal,
+                // so we should not try to commit anything
+            } else if (store.setHead(base, newHead)) {
+                base = newHead;
+                head = newHead;
+                return -1;
+            }
+
+            // someone else was faster, so restore state and retry later
+            base = originalBase;
+            head = originalHead;
+
+            Thread.sleep(backoff, RANDOM.nextInt(1000000));
+
+            // rebase to latest head before trying again
+            rebase();
+
+            long stop = System.nanoTime();
+            if (stop - start > timeout) {
+                timeout = stop - start;
+            }
+        }
+
+        return MILLISECONDS.convert(timeout, NANOSECONDS);
+    }
+
+    private synchronized void pessimisticMerge(CommitHook hook, long timeout)
+            throws CommitFailedException {
         while (true) {
             SegmentNodeState before = store.getHead();
             long now = System.currentTimeMillis();
@@ -120,7 +176,7 @@ class SegmentNodeStoreBranch extends Abs
                     if (store.setHead(after, newHead)) {
                         base = newHead;
                         head = newHead;
-                        return getHead();
+                        return;
                     } else {
                         // something else happened, perhaps a timeout, so
                         // undo the previous rebase and try again
@@ -135,46 +191,15 @@ class SegmentNodeStoreBranch extends Abs
     @Override @Nonnull
     public synchronized NodeState merge(CommitHook hook)
             throws CommitFailedException {
-        int backoff = 1;
-        SegmentNodeState originalBase = base;
-        SegmentNodeState originalHead = head;
-        while (base != head) {
-            // apply commit hooks on the rebased changes
-            NodeBuilder builder = head.builder();
-            builder.removeProperty("token");
-            builder.removeProperty("timeout");
-            builder.setChildNode(ROOT, hook.processCommit(
-                    base.getChildNode(ROOT), head.getChildNode(ROOT)));
-            SegmentNodeState newHead = writer.writeNode(builder.getNodeState());
-            writer.flush();
-
-            // use optimistic locking to update the journal
-            if (store.setHead(base, newHead)) {
-                base = newHead;
-                head = newHead;
-            } else {
-                // someone else was faster, so restore state and retry later
-                base = originalBase;
-                head = originalHead;
-
-                // use exponential backoff to reduce contention
-                if (backoff < 10000) {
-                    try {
-                        Thread.sleep(backoff, RANDOM.nextInt(1000000));
-                        backoff *= 2;
-                    } catch (InterruptedException e) {
-                        // TODO: correct interrupt handling?
-                        throw new CommitFailedException(
-                                "Segment", 1, "Commit was interrupted", e);
-                    }
-                } else {
-                    throw new CommitFailedException(
-                            "Segment", 2,
-                            "System overloaded, try again later");
+        if (base != head) {
+            try {
+                long timeout = optimisticMerge(hook);
+                if (timeout >= 0) {
+                    pessimisticMerge(hook, timeout);
                 }
-
-                // rebase to latest head before trying again
-                rebase();
+            } catch (InterruptedException e) {
+                throw new CommitFailedException(
+                        "Segment", 1, "Commit interrupted", e);
             }
         }
         return getHead();

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java?rev=1503616&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java Tue Jul 16 08:43:35 2013
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.junit.Test;
+
+public class MergeTest {
+
+    @Test
+    public void testSequentialMerge() throws CommitFailedException {
+        NodeStore store = new SegmentNodeStore(new MemoryStore());
+
+        assertFalse(store.getRoot().hasProperty("foo"));
+        assertFalse(store.getRoot().hasProperty("bar"));
+
+        NodeStoreBranch a = store.branch();
+        a.setRoot(a.getHead().builder().setProperty("foo", "abc").getNodeState());
+        a.merge(EmptyHook.INSTANCE);
+
+        assertTrue(store.getRoot().hasProperty("foo"));
+        assertFalse(store.getRoot().hasProperty("bar"));
+
+        NodeStoreBranch b = store.branch();
+        b.setRoot(b.getHead().builder().setProperty("bar", "xyz").getNodeState());
+        b.merge(EmptyHook.INSTANCE);
+
+        assertTrue(store.getRoot().hasProperty("foo"));
+        assertTrue(store.getRoot().hasProperty("bar"));
+    }
+
+    @Test
+    public void testOptimisticMerge() throws CommitFailedException {
+        NodeStore store = new SegmentNodeStore(new MemoryStore());
+
+        NodeStoreBranch a = store.branch();
+        a.setRoot(a.getHead().builder().setProperty("foo", "abc").getNodeState());
+
+        NodeStoreBranch b = store.branch();
+        b.setRoot(b.getHead().builder().setProperty("bar", "xyz").getNodeState());
+
+        assertFalse(store.getRoot().hasProperty("foo"));
+        assertFalse(store.getRoot().hasProperty("bar"));
+
+        a.merge(EmptyHook.INSTANCE);
+
+        assertTrue(store.getRoot().hasProperty("foo"));
+        assertFalse(store.getRoot().hasProperty("bar"));
+
+        b.merge(EmptyHook.INSTANCE);
+
+        assertTrue(store.getRoot().hasProperty("foo"));
+        assertTrue(store.getRoot().hasProperty("bar"));
+    }
+
+    @Test
+    public void testPessimisticMerge() throws Exception {
+        final SegmentNodeStore store = new SegmentNodeStore(new MemoryStore());
+        final Semaphore semaphore = new Semaphore(0);
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        Thread background = new Thread() {
+            @Override
+            public void run() {
+                for (int i = 0; running.get(); i++) {
+                    try {
+                        SegmentNodeStoreBranch a = store.branch();
+                        NodeBuilder builder = a.getHead().builder();
+                        builder.setProperty("foo", "abc" + i);
+                        a.setRoot(builder.getNodeState());
+                        a.merge(EmptyHook.INSTANCE);
+                        semaphore.release();
+                    } catch (CommitFailedException e) {
+                        fail();
+                    }
+                }
+            }
+        };
+        background.start();
+
+        // wait for the first commit
+        semaphore.acquire();
+
+        assertTrue(store.getRoot().hasProperty("foo"));
+        assertFalse(store.getRoot().hasProperty("bar"));
+
+        SegmentNodeStoreBranch b = store.branch();
+        b.setMaximumBackoff(100);
+        b.setRoot(b.getHead().builder().setProperty("bar", "xyz").getNodeState());
+        b.merge(new CommitHook() {
+            @Override @Nonnull
+            public NodeState processCommit(NodeState before, NodeState after) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    fail();
+                }
+                return after;
+            }
+        });
+
+        assertTrue(store.getRoot().hasProperty("foo"));
+        assertTrue(store.getRoot().hasProperty("bar"));
+
+        running.set(false);
+        background.join();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/MergeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native