You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/02 17:11:44 UTC

svn commit: r1547081 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: Record.java SegmentNodeState.java SegmentNodeStoreBranch.java

Author: jukka
Date: Mon Dec  2 16:11:44 2013
New Revision: 1547081

URL: http://svn.apache.org/r1547081
Log:
OAK-1249: Fine-grained locking in SegmentMK commits

Stick with a global lock for local commits for now until the merge feature gets optimized
Reduce the amount of synchronization in SegmentNodeState and Record

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java?rev=1547081&r1=1547080&r2=1547081&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java Mon Dec  2 16:11:44 2013
@@ -35,7 +35,7 @@ class Record {
      * {@link #getSegment()} method is first called to prevent the potentially
      * costly pre-loading of segments that might actually not be needed.
      */
-    private Segment segment;
+    private volatile Segment segment;
 
     /**
      * Identifier of the segment that contains this record. The value of
@@ -43,7 +43,7 @@ class Record {
      * get updated by the {@link #getSegment()} method to indicate that
      * lazy initialization has happened.
      */
-    private UUID uuid;
+    private volatile UUID uuid;
 
     /**
      * Segment offset of this record.
@@ -106,7 +106,7 @@ class Record {
      *
      * @return record identifier
      */
-    public synchronized RecordId getRecordId() {
+    public RecordId getRecordId() {
         return new RecordId(uuid, offset);
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java?rev=1547081&r1=1547080&r2=1547081&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java Mon Dec  2 16:11:44 2013
@@ -38,24 +38,28 @@ import static org.apache.jackrabbit.oak.
 
 public class SegmentNodeState extends Record implements NodeState {
 
-    private RecordId templateId = null;
+    private volatile RecordId templateId = null;
 
-    private Template template = null;
+    private volatile Template template = null;
 
     public SegmentNodeState(Segment segment, RecordId id) {
         super(segment, id);
     }
 
     RecordId getTemplateId() {
-        getTemplate(); // force loading of the template
+        if (templateId == null) {
+            // no problem if updated concurrently,
+            // as each concurrent thread will just get the same value
+            templateId = getSegment().readRecordId(getOffset(0));
+        }
         return templateId;
     }
 
-    synchronized Template getTemplate() {
+    Template getTemplate() {
         if (template == null) {
-            Segment segment = getSegment();
-            templateId = segment.readRecordId(getOffset(0));
-            template = segment.readTemplate(templateId);
+            // no problem if updated concurrently,
+            // as each concurrent thread will just get the same value
+            template = getSegment().readTemplate(getTemplateId());
         }
         return template;
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1547081&r1=1547080&r2=1547081&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Mon Dec  2 16:11:44 2013
@@ -99,33 +99,34 @@ class SegmentNodeStoreBranch implements 
 
         // use exponential backoff in case of concurrent commits
         for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
-            long start = System.nanoTime();
+            rebase(); // rebase to latest head, a no-op if already there
 
-            // apply commit hooks on the rebased changes
-            NodeBuilder builder = head.builder();
-            builder.setChildNode(ROOT, hook.processCommit(
-                    base.getChildNode(ROOT), head.getChildNode(ROOT)));
-            SegmentNodeState newHead = writer.writeNode(builder.getNodeState());
+            long start = System.nanoTime();
 
-            // use optimistic locking to update the journal
             if (base.hasProperty("token")
                     && base.getLong("timeout") >= System.currentTimeMillis()) {
                 // someone else has a pessimistic lock on the journal,
                 // so we should not try to commit anything
-            } else if (store.setHead(base, newHead, info)) {
-                base = newHead;
-                head = newHead;
-                return -1;
+            } else {
+                // apply commit hooks on the rebased changes
+                NodeBuilder builder = head.builder();
+                builder.setChildNode(ROOT, hook.processCommit(
+                        base.getChildNode(ROOT), head.getChildNode(ROOT)));
+                SegmentNodeState newHead = writer.writeNode(builder.getNodeState());
+
+                // use optimistic locking to update the journal
+                if (store.setHead(base, newHead, info)) {
+                    base = newHead;
+                    head = newHead;
+                    return -1;
+                }
             }
 
             // someone else was faster, so restore state and retry later
             base = originalBase;
             head = originalHead;
 
-            Thread.sleep(backoff, RANDOM.nextInt(1000000));
-
-            // rebase to latest head before trying again
-            rebase();
+            RANDOM.wait(backoff, RANDOM.nextInt(1000000));
 
             long stop = System.nanoTime();
             if (stop - start > timeout) {
@@ -136,7 +137,8 @@ class SegmentNodeStoreBranch implements 
         return MILLISECONDS.convert(timeout, NANOSECONDS);
     }
 
-    private synchronized void pessimisticMerge(CommitHook hook, long timeout, CommitInfo info)
+    private synchronized void pessimisticMerge(
+            CommitHook hook, long timeout, CommitInfo info)
             throws CommitFailedException, InterruptedException {
         while (true) {
             SegmentNodeState before = store.head;
@@ -144,7 +146,9 @@ class SegmentNodeStoreBranch implements 
             if (before.hasProperty("token")
                     && before.getLong("timeout") >= now) {
                 // locked by someone else, wait until unlocked or expired
-                // TODO: explicit sleep needed to avoid spinning?
+                RANDOM.wait(
+                        Math.min(before.getLong("timeout") - now, 1000),
+                        RANDOM.nextInt(1000000));
             } else {
                 // attempt to acquire the lock
                 NodeBuilder builder = before.builder();
@@ -188,14 +192,18 @@ class SegmentNodeStoreBranch implements 
             throws CommitFailedException {
         checkNotNull(hook);
         if (base != head) {
-            try {
-                long timeout = optimisticMerge(hook, info);
-                if (timeout >= 0) {
-                    pessimisticMerge(hook, timeout, info);
+            synchronized (RANDOM) {
+                try {
+                    long timeout = optimisticMerge(hook, info);
+                    if (timeout >= 0) {
+                        pessimisticMerge(hook, timeout, info);
+                    }
+                } catch (InterruptedException e) {
+                    throw new CommitFailedException(
+                            "Segment", 1, "Commit interrupted", e);
+                } finally {
+                    RANDOM.notifyAll();
                 }
-            } catch (InterruptedException e) {
-                throw new CommitFailedException(
-                        "Segment", 1, "Commit interrupted", e);
             }
         }
         return getHead();