You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2018/02/05 11:21:28 UTC
svn commit: r1823154 -
/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
Author: adulceanu
Date: Mon Feb 5 11:21:27 2018
New Revision: 1823154
URL: http://svn.apache.org/viewvc?rev=1823154&view=rev
Log:
OAK-7162 - Race condition on revisions head between compaction and scheduler could result in skipped commit
Moved head state update in a loop
Added upper bound, backoff and logging
Modified:
jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java?rev=1823154&r1=1823153&r2=1823154&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java Mon Feb 5 11:21:27 2018
@@ -19,20 +19,25 @@ package org.apache.jackrabbit.oak.segmen
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.api.Type.LONG;
import java.io.Closeable;
+import java.text.MessageFormat;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingWindowReservoir;
import javax.annotation.Nonnull;
-
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.segment.Revisions;
@@ -51,9 +56,6 @@ import org.apache.jackrabbit.oak.stats.S
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.SlidingWindowReservoir;
-
public class LockBasedScheduler implements Scheduler {
public static class LockBasedSchedulerBuilder {
@@ -123,6 +125,12 @@ public class LockBasedScheduler implemen
.parseDouble(System.getProperty("oak.scheduler.fetch.commitDelayQuantile", "0.5"));
/**
+ * Maximum number of milliseconds to wait before re-attempting to update the current
+ * head state after a successful commit, provided a concurrent head state update happens.
+ */
+ private static final long MAXIMUM_BACKOFF = MILLISECONDS.convert(10, SECONDS);
+
+ /**
* Sets the number of seconds to wait for the attempt to grab the lock to
* create a checkpoint
*/
@@ -149,6 +157,7 @@ public class LockBasedScheduler implemen
private final Histogram commitTimeHistogram = new Histogram(new SlidingWindowReservoir(1000));
+ private final Random random = new Random();
public LockBasedScheduler(LockBasedSchedulerBuilder builder) {
if (COMMIT_FAIR_LOCK) {
@@ -244,19 +253,39 @@ public class LockBasedScheduler implemen
}
}
- private NodeState execute(Commit commit) throws CommitFailedException {
+ private NodeState execute(Commit commit) throws CommitFailedException, InterruptedException {
// only do the merge if there are some changes to commit
if (commit.hasChanges()) {
- refreshHead(true);
- SegmentNodeState before = head.get();
- SegmentNodeState after = commit.apply(before);
- if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
- head.set(after);
- contentChanged(after.getChildNode(ROOT), commit.info());
+ long start = System.nanoTime();
+
+ int count = 0;
+ for (long backoff = 1; backoff < MAXIMUM_BACKOFF; backoff *= 2) {
refreshHead(true);
+ SegmentNodeState before = head.get();
+ SegmentNodeState after = commit.apply(before);
+
+ if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
+ head.set(after);
+ contentChanged(after.getChildNode(ROOT), commit.info());
+ refreshHead(true);
+
+ return head.get().getChildNode(ROOT);
+ }
+
+ count++;
+ int randNs = random.nextInt(1_000_000);
+ log.info("Scheduler detected concurrent commits. Retrying after {} ms and {} ns", backoff, randNs);
+ Thread.sleep(backoff, randNs);
}
+
+ long finish = System.nanoTime();
+
+ String message = MessageFormat.format(
+ "The commit could not be executed after {} attempts. Total wait time: {} ms",
+ count, NANOSECONDS.toMillis(finish - start));
+ throw new CommitFailedException("Segment", 3, message);
}
-
+
return head.get().getChildNode(ROOT);
}