You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2018/02/05 11:21:28 UTC

svn commit: r1823154 - /jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java

Author: adulceanu
Date: Mon Feb  5 11:21:27 2018
New Revision: 1823154

URL: http://svn.apache.org/viewvc?rev=1823154&view=rev
Log:
OAK-7162 - Race condition on revisions head between compaction and scheduler could result in skipped commit
Moved head state update in a loop
Added upper bound, backoff and logging

Modified:
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java?rev=1823154&r1=1823153&r2=1823154&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java Mon Feb  5 11:21:27 2018
@@ -19,20 +19,25 @@ package org.apache.jackrabbit.oak.segmen
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.api.Type.LONG;
 
 import java.io.Closeable;
+import java.text.MessageFormat;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingWindowReservoir;
 import javax.annotation.Nonnull;
-
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.segment.Revisions;
@@ -51,9 +56,6 @@ import org.apache.jackrabbit.oak.stats.S
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.SlidingWindowReservoir;
-
 public class LockBasedScheduler implements Scheduler {
 
     public static class LockBasedSchedulerBuilder {
@@ -123,6 +125,12 @@ public class LockBasedScheduler implemen
             .parseDouble(System.getProperty("oak.scheduler.fetch.commitDelayQuantile", "0.5"));
     
     /**
+     * Maximum number of milliseconds to wait before re-attempting to update the current
+     * head state after a successful commit, provided a concurrent head state update happens.
+     */
+    private static final long MAXIMUM_BACKOFF = MILLISECONDS.convert(10, SECONDS);
+    
+    /**
      * Sets the number of seconds to wait for the attempt to grab the lock to
      * create a checkpoint
      */
@@ -149,6 +157,7 @@ public class LockBasedScheduler implemen
     
     private final Histogram commitTimeHistogram = new Histogram(new SlidingWindowReservoir(1000));
     
+    private final Random random = new Random();
 
     public LockBasedScheduler(LockBasedSchedulerBuilder builder) {
         if (COMMIT_FAIR_LOCK) {
@@ -244,19 +253,39 @@ public class LockBasedScheduler implemen
         }
     }
 
-    private NodeState execute(Commit commit) throws CommitFailedException {
+    private NodeState execute(Commit commit) throws CommitFailedException, InterruptedException {
         // only do the merge if there are some changes to commit
         if (commit.hasChanges()) {
-            refreshHead(true);
-            SegmentNodeState before = head.get();
-            SegmentNodeState after = commit.apply(before);
-            if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
-                head.set(after);
-                contentChanged(after.getChildNode(ROOT), commit.info());
+            long start = System.nanoTime();
+            
+            int count = 0;
+            for (long backoff = 1; backoff < MAXIMUM_BACKOFF; backoff *= 2) {
                 refreshHead(true);
+                SegmentNodeState before = head.get();
+                SegmentNodeState after = commit.apply(before);
+                
+                if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
+                    head.set(after);
+                    contentChanged(after.getChildNode(ROOT), commit.info());
+                    refreshHead(true);
+                    
+                    return head.get().getChildNode(ROOT);
+                } 
+                
+                count++;
+                int randNs = random.nextInt(1_000_000);
+                log.info("Scheduler detected concurrent commits. Retrying after {} ms and {} ns", backoff, randNs);
+                Thread.sleep(backoff, randNs);
             }
+            
+            long finish = System.nanoTime();
+            
+            String message = MessageFormat.format(
+                    "The commit could not be executed after {} attempts. Total wait time: {} ms",
+                    count, NANOSECONDS.toMillis(finish - start));
+            throw new CommitFailedException("Segment", 3, message);
         }
-
+        
         return head.get().getChildNode(ROOT);
     }