You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/04/01 14:34:46 UTC
svn commit: r1583648 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/
oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/
Author: mduerig
Date: Tue Apr 1 12:34:46 2014
New Revision: 1583648
URL: http://svn.apache.org/r1583648
Log:
OAK-1659: Improve CommitRateLimiter to delay commits
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java?rev=1583648&r1=1583647&r2=1583648&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java Tue Apr 1 12:34:46 2014
@@ -27,13 +27,15 @@ import org.apache.jackrabbit.oak.api.Com
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
/**
- * This {@code CommitHook} can be used to block commits for any length of time.
+ * This {@code CommitHook} can be used to block or delay commits for any length of time.
* As long as commits are blocked this hook throws a {@code CommitFailedException}.
*/
public class CommitRateLimiter implements CommitHook {
private volatile boolean blockCommits;
+ private volatile long delay;
/**
* Block any further commits until {@link #unblockCommits()} is called.
@@ -49,14 +51,45 @@ public class CommitRateLimiter implement
blockCommits = false;
}
+ /**
+ * Number of milli seconds to delay commits going through this hook.
+ * If {@code 0}, any currently blocked commit will be unblocked.
+ * @param delay milli seconds
+ */
+ public void setDelay(long delay) {
+ this.delay = delay;
+ if (delay == 0) {
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
@Nonnull
@Override
public NodeState processCommit(NodeState before, NodeState after, CommitInfo info)
throws CommitFailedException {
if (blockCommits) {
throw new CommitFailedException(OAK, 1, "System busy. Try again later.");
- } else {
- return after;
+ }
+ delay();
+ return after;
+ }
+
+ private void delay() throws CommitFailedException {
+ if (delay > 0) {
+ synchronized (this) {
+ try {
+ long t0 = Clock.ACCURATE.getTime();
+ long dt = delay;
+ do {
+ wait(dt);
+ dt = dt - Clock.ACCURATE.getTime() + t0;
+ } while (delay > 0 && dt > 0);
+ } catch (InterruptedException e) {
+ throw new CommitFailedException(OAK, 2, "Interrupted while waiting to commit", e);
+ }
+ }
}
}
}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java?rev=1583648&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java Tue Apr 1 12:34:46 2014
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CommitRateLimiterTest {
+ private static final NodeState AFTER = createAfter();
+
+ private static NodeState createAfter() {
+ NodeBuilder builder = EMPTY_NODE.builder();
+ builder.setChildNode("a");
+ builder.setProperty("x", 42);
+ return builder.getNodeState();
+ }
+
+ private CommitRateLimiter limiter;
+
+ @Before
+ public void setup() {
+ limiter = new CommitRateLimiter();
+ }
+
+ @Test
+ public void commit() throws CommitFailedException {
+ assertSame(AFTER, limiter.processCommit(EMPTY_NODE, AFTER, null));
+ }
+
+ @Test(expected = CommitFailedException.class)
+ public void blockCommits() throws CommitFailedException {
+ limiter.blockCommits();
+ limiter.processCommit(EMPTY_NODE, AFTER, null);
+ }
+
+ @Test
+ public void delayCommits() throws CommitFailedException {
+ limiter.setDelay(1000);
+ long t0 = Clock.ACCURATE.getTime();
+ assertSame(AFTER, limiter.processCommit(EMPTY_NODE, AFTER, null));
+ assertTrue(Clock.ACCURATE.getTime() - t0 >= 1000);
+ }
+
+ private final FutureTask<Long> commit = new FutureTask<Long>(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ long t0 = Clock.ACCURATE.getTime();
+ Clock.ACCURATE.waitUntil(Clock.ACCURATE.getTime() + 100);
+ assertSame(AFTER, limiter.processCommit(EMPTY_NODE, AFTER, null));
+ return Clock.ACCURATE.getTime() - t0;
+ }
+ });
+
+ @Test
+ public void delayCommitsWithReset() throws InterruptedException, ExecutionException, TimeoutException {
+ limiter.setDelay(10000);
+ new Thread(commit).start();
+ limiter.setDelay(0);
+ assertTrue(commit.get(1, TimeUnit.SECONDS) >= 100);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void delayCommitsWithInterrupt() throws InterruptedException, ExecutionException, TimeoutException {
+ limiter.setDelay(10000);
+ Thread t = new Thread(commit);
+ t.start();
+ t.interrupt();
+ commit.get(1, TimeUnit.SECONDS);
+ }
+}
Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1583648&r1=1583647&r2=1583648&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Tue Apr 1 12:34:46 2014
@@ -74,6 +74,18 @@ import org.slf4j.LoggerFactory;
class ChangeProcessor implements Observer {
private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
+ /**
+ * Fill ratio of the revision queue at which commits should be delayed
+ * (conditional of {@code commitRateLimiter} being non {@code null}).
+ */
+ public static final double DELAY_THRESHOLD = 0.8;
+
+ /**
+ * Maximal number of milli seconds a commit is delayed once {@code DELAY_THRESHOLD}
+ * kicks in.
+ */
+ public static final int MAX_DELAY = 10000;
+
private final ContentSession contentSession;
private final NamePathMapper namePathMapper;
private final PermissionProvider permissionProvider;
@@ -150,21 +162,50 @@ class ChangeProcessor implements Observe
private BackgroundObserver createObserver(final WhiteboardExecutor executor) {
return new BackgroundObserver(this, executor, queueLength) {
- private volatile boolean warnWhenFull = true;
+ private volatile long delay;
+ private volatile boolean blocking;
@Override
protected void added(int queueSize) {
maxQueueLength.recordValue(queueSize);
- if (warnWhenFull && queueSize == queueLength) {
- warnWhenFull = false;
+
+ if (queueSize == queueLength) {
if (commitRateLimiter != null) {
+ if (!blocking) {
+ LOG.warn("Revision queue is full. Further commits will be blocked.");
+ }
commitRateLimiter.blockCommits();
+ } else if (!blocking) {
+ LOG.warn("Revision queue is full. Further revisions will be compacted.");
}
- LOG.warn("Revision queue is full. Further revisions will be compacted.");
- } else if (queueSize <= 1) {
- warnWhenFull = true;
- if (commitRateLimiter != null) {
- commitRateLimiter.unblockCommits();
+ blocking = true;
+ } else {
+ double fillRatio = (double) queueSize / queueLength;
+ if (fillRatio > DELAY_THRESHOLD) {
+ if (commitRateLimiter != null) {
+ if (delay == 0) {
+ LOG.warn("Revision queue is becoming full. Further commits will be delayed.");
+ }
+
+ // Linear backoff proportional to the number of items exceeding
+ // DELAY_THRESHOLD. Offset by 1 to trigger the log message in the
+ // else branch once the queue falls below DELAY_THRESHOLD again.
+ delay = 1 + (int) ((fillRatio - DELAY_THRESHOLD) / ( 1 - DELAY_THRESHOLD) * MAX_DELAY);
+ commitRateLimiter.setDelay(delay);
+ }
+ } else {
+ if (commitRateLimiter != null) {
+ commitRateLimiter.setDelay(0);
+ commitRateLimiter.unblockCommits();
+ if (delay > 0) {
+ LOG.debug("Revision queue becoming empty. Unblocking commits");
+ }
+ if (blocking) {
+ LOG.debug("Revision queue becoming empty. Stop delaying commits.");
+ }
+ }
+ delay = 0;
+ blocking = false;
}
}
}