You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/04/01 14:34:46 UTC

svn commit: r1583648 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/

Author: mduerig
Date: Tue Apr  1 12:34:46 2014
New Revision: 1583648

URL: http://svn.apache.org/r1583648
Log:
OAK-1659: Improve CommitRateLimiter to delay commits

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java?rev=1583648&r1=1583647&r2=1583648&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java Tue Apr  1 12:34:46 2014
@@ -27,13 +27,15 @@ import org.apache.jackrabbit.oak.api.Com
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
 
 /**
- * This {@code CommitHook} can be used to block commits for any length of time.
+ * This {@code CommitHook} can be used to block or delay commits for any length of time.
  * As long as commits are blocked this hook throws a {@code CommitFailedException}.
  */
 public class CommitRateLimiter implements CommitHook {
     private volatile boolean blockCommits;
+    private volatile long delay;
 
     /**
      * Block any further commits until {@link #unblockCommits()} is called.
@@ -49,14 +51,45 @@ public class CommitRateLimiter implement
         blockCommits = false;
     }
 
+    /**
+     * Number of milli seconds to delay commits going through this hook.
+     * If {@code 0}, any currently blocked commit will be unblocked.
+     * @param delay  milli seconds
+     */
+    public void setDelay(long delay) {
+        this.delay = delay;
+        if (delay == 0) {
+            synchronized (this) {
+                this.notifyAll();
+            }
+        }
+    }
+
     @Nonnull
     @Override
     public NodeState processCommit(NodeState before, NodeState after, CommitInfo info)
             throws CommitFailedException {
         if (blockCommits) {
             throw new CommitFailedException(OAK, 1, "System busy. Try again later.");
-        } else {
-            return after;
+        }
+        delay();
+        return after;
+    }
+
+    private void delay() throws CommitFailedException {
+        if (delay > 0) {
+            synchronized (this) {
+                try {
+                    long t0 = Clock.ACCURATE.getTime();
+                    long dt = delay;
+                    do {
+                        wait(dt);
+                        dt = dt - Clock.ACCURATE.getTime() + t0;
+                    } while (delay > 0 && dt > 0);
+                } catch (InterruptedException e) {
+                    throw new CommitFailedException(OAK, 2, "Interrupted while waiting to commit", e);
+                }
+            }
         }
     }
 }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java?rev=1583648&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiterTest.java Tue Apr  1 12:34:46 2014
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CommitRateLimiterTest {
+    private static final NodeState AFTER = createAfter();
+
+    private static NodeState createAfter() {
+        NodeBuilder builder = EMPTY_NODE.builder();
+        builder.setChildNode("a");
+        builder.setProperty("x", 42);
+        return builder.getNodeState();
+    }
+
+    private CommitRateLimiter limiter;
+
+    @Before
+    public void setup() {
+        limiter = new CommitRateLimiter();
+    }
+
+    @Test
+    public void commit() throws CommitFailedException {
+        assertSame(AFTER, limiter.processCommit(EMPTY_NODE, AFTER, null));
+    }
+
+    @Test(expected = CommitFailedException.class)
+    public void blockCommits() throws CommitFailedException {
+        limiter.blockCommits();
+        limiter.processCommit(EMPTY_NODE, AFTER, null);
+    }
+
+    @Test
+    public void delayCommits() throws CommitFailedException {
+        limiter.setDelay(1000);
+        long t0 = Clock.ACCURATE.getTime();
+        assertSame(AFTER, limiter.processCommit(EMPTY_NODE, AFTER, null));
+        assertTrue(Clock.ACCURATE.getTime() - t0 >= 1000);
+    }
+
+    private final FutureTask<Long> commit = new FutureTask<Long>(new Callable<Long>() {
+        @Override
+        public Long call() throws Exception {
+            long t0 = Clock.ACCURATE.getTime();
+            Clock.ACCURATE.waitUntil(Clock.ACCURATE.getTime() + 100);
+            assertSame(AFTER, limiter.processCommit(EMPTY_NODE, AFTER, null));
+            return Clock.ACCURATE.getTime() - t0;
+        }
+    });
+
+    @Test
+    public void delayCommitsWithReset() throws InterruptedException, ExecutionException, TimeoutException {
+        limiter.setDelay(10000);
+        new Thread(commit).start();
+        limiter.setDelay(0);
+        assertTrue(commit.get(1, TimeUnit.SECONDS) >= 100);
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void delayCommitsWithInterrupt() throws InterruptedException, ExecutionException, TimeoutException {
+        limiter.setDelay(10000);
+        Thread t = new Thread(commit);
+        t.start();
+        t.interrupt();
+        commit.get(1, TimeUnit.SECONDS);
+    }
+}

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1583648&r1=1583647&r2=1583648&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Tue Apr  1 12:34:46 2014
@@ -74,6 +74,18 @@ import org.slf4j.LoggerFactory;
 class ChangeProcessor implements Observer {
     private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
 
+    /**
+     * Fill ratio of the revision queue at which commits should be delayed
+     * (conditional of {@code commitRateLimiter} being non {@code null}).
+     */
+    public static final double DELAY_THRESHOLD = 0.8;
+
+    /**
+     * Maximal number of milli seconds a commit is delayed once {@code DELAY_THRESHOLD}
+     * kicks in.
+     */
+    public static final int MAX_DELAY = 10000;
+
     private final ContentSession contentSession;
     private final NamePathMapper namePathMapper;
     private final PermissionProvider permissionProvider;
@@ -150,21 +162,50 @@ class ChangeProcessor implements Observe
 
     private BackgroundObserver createObserver(final WhiteboardExecutor executor) {
         return new BackgroundObserver(this, executor, queueLength) {
-            private volatile boolean warnWhenFull = true;
+            private volatile long delay;
+            private volatile boolean blocking;
 
             @Override
             protected void added(int queueSize) {
                 maxQueueLength.recordValue(queueSize);
-                if (warnWhenFull && queueSize == queueLength) {
-                    warnWhenFull = false;
+
+                if (queueSize == queueLength) {
                     if (commitRateLimiter != null) {
+                        if (!blocking) {
+                            LOG.warn("Revision queue is full. Further commits will be blocked.");
+                        }
                         commitRateLimiter.blockCommits();
+                    } else if (!blocking) {
+                        LOG.warn("Revision queue is full. Further revisions will be compacted.");
                     }
-                    LOG.warn("Revision queue is full. Further revisions will be compacted.");
-                } else if (queueSize <= 1) {
-                    warnWhenFull = true;
-                    if (commitRateLimiter != null) {
-                        commitRateLimiter.unblockCommits();
+                    blocking = true;
+                } else {
+                    double fillRatio = (double) queueSize / queueLength;
+                    if (fillRatio > DELAY_THRESHOLD) {
+                        if (commitRateLimiter != null) {
+                            if (delay == 0) {
+                                LOG.warn("Revision queue is becoming full. Further commits will be delayed.");
+                            }
+
+                            // Linear backoff proportional to the number of items exceeding
+                            // DELAY_THRESHOLD. Offset by 1 to trigger the log message in the
+                            // else branch once the queue falls below DELAY_THRESHOLD again.
+                            delay = 1 + (int) ((fillRatio - DELAY_THRESHOLD) / ( 1 - DELAY_THRESHOLD) * MAX_DELAY);
+                            commitRateLimiter.setDelay(delay);
+                        }
+                    } else {
+                        if (commitRateLimiter != null) {
+                            commitRateLimiter.setDelay(0);
+                            commitRateLimiter.unblockCommits();
+                            if (delay > 0) {
+                                LOG.debug("Revision queue becoming empty. Unblocking commits");
+                            }
+                            if (blocking) {
+                                LOG.debug("Revision queue becoming empty. Stop delaying commits.");
+                            }
+                        }
+                        delay = 0;
+                        blocking = false;
                     }
                 }
             }