You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/04/02 10:23:36 UTC

svn commit: r1583922 - /jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Author: mduerig
Date: Wed Apr  2 08:23:36 2014
New Revision: 1583922

URL: http://svn.apache.org/r1583922
Log:
OAK-1659: Improve CommitRateLimiter to delay commits
Improve recovery when listeners have caught up

Modified:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1583922&r1=1583921&r2=1583922&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Wed Apr  2 08:23:36 2014
@@ -190,22 +190,25 @@ class ChangeProcessor implements Observe
                             // Linear backoff proportional to the number of items exceeding
                             // DELAY_THRESHOLD. Offset by 1 to trigger the log message in the
                             // else branch once the queue falls below DELAY_THRESHOLD again.
-                            delay = 1 + (int) ((fillRatio - DELAY_THRESHOLD) / ( 1 - DELAY_THRESHOLD) * MAX_DELAY);
-                            commitRateLimiter.setDelay(delay);
+                            int newDelay = 1 + (int) ((fillRatio - DELAY_THRESHOLD) / (1 - DELAY_THRESHOLD) * MAX_DELAY);
+                            if (newDelay > delay) {
+                                delay = newDelay;
+                                commitRateLimiter.setDelay(delay);
+                            }
                         }
                     } else {
                         if (commitRateLimiter != null) {
-                            commitRateLimiter.setDelay(0);
-                            commitRateLimiter.unblockCommits();
                             if (delay > 0) {
                                 LOG.debug("Revision queue becoming empty. Unblocking commits");
+                                commitRateLimiter.setDelay(0);
+                                delay = 0;
                             }
                             if (blocking) {
                                 LOG.debug("Revision queue becoming empty. Stop delaying commits.");
+                                commitRateLimiter.unblockCommits();
+                                blocking = false;
                             }
                         }
-                        delay = 0;
-                        blocking = false;
                     }
                 }
             }