You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/02/15 13:52:50 UTC

svn commit: r1783104 - in /jackrabbit/oak/trunk/oak-jcr/src: main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Author: catholicon
Date: Wed Feb 15 13:52:50 2017
New Revision: 1783104

URL: http://svn.apache.org/viewvc?rev=1783104&view=rev
Log:
OAK-5626: ChangeProcessor doesn't reset 'blocking' flag when items from queue gets removed and commit-rate-limiter is null

ChangeProcessor would now log WARN each time queue gets full. To avoid flooding of logs, consecutive WARNs would be avoided a breathing period (Default 10 minutes. Can be configured by JVM command line param: "oak.observation.full-queue.warn.interval"). During the breathing period, queue full logs would still be logged at DEBUG level.

Also, add test to check the behavior.

Modified:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1783104&r1=1783103&r2=1783104&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Wed Feb 15 13:52:50 2017
@@ -62,6 +62,7 @@ import org.apache.jackrabbit.oak.spi.whi
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
 import org.apache.jackrabbit.oak.stats.TimerStats;
@@ -113,7 +114,19 @@ class ChangeProcessor implements Filteri
      * kicks in.
      */
     public static final int MAX_DELAY;
-    
+
+    //It'd would have been more useful to have following 2 properties as instance variables
+    //which got set by tests. But, the tests won't get a handle to the actual instance, so
+    //static-members it is.
+    /**
+     * Number of milliseconds to wait before issuing consecutive queue full warn messages
+     * Controlled by command line property "oak.observation.full-queue.warn.interval".
+     * Note, the command line parameter is wait interval in minutes.
+     */
+    static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
+            .getInteger("oak.observation.full-queue.warn.interval", 10));
+    static Clock clock = Clock.SIMPLE;
+
     // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
     static {
         final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -294,6 +307,8 @@ class ChangeProcessor implements Filteri
             private volatile long delay;
             private volatile boolean blocking;
 
+            private long lastQueueFullWarnTimestamp = -1;
+
             @Override
             protected void added(int newQueueSize) {
                 queueSizeChanged(newQueueSize);
@@ -310,11 +325,11 @@ class ChangeProcessor implements Filteri
                 if (newQueueSize >= queueLength) {
                     if (commitRateLimiter != null) {
                         if (!blocking) {
-                            LOG.warn("Revision queue is full. Further commits will be blocked.");
+                            logQueueFullWarning("Revision queue is full. Further commits will be blocked.");
                         }
                         commitRateLimiter.blockCommits();
                     } else if (!blocking) {
-                        LOG.warn("Revision queue is full. Further revisions will be compacted.");
+                        logQueueFullWarning("Revision queue is full. Further revisions will be compacted.");
                     }
                     blocking = true;
                 } else {
@@ -346,11 +361,24 @@ class ChangeProcessor implements Filteri
                                 commitRateLimiter.unblockCommits();
                                 blocking = false;
                             }
+                        } else {
+                            blocking = false;
                         }
                     }
                 }
             }
 
+            private void logQueueFullWarning(String message) {
+                long currTime = clock.getTime();
+                if (lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) {
+                    LOG.warn("{} Suppressing further such cases for {} seconds.",
+                            message,
+                            TimeUnit.MILLISECONDS.toSeconds(QUEUE_FULL_WARN_INTERVAL));
+                    lastQueueFullWarnTimestamp = currTime;
+                } else {
+                    LOG.debug(message);
+                }
+            }
             
             @Override
             public String toString() {

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?rev=1783104&r1=1783103&r2=1783104&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Wed Feb 15 13:52:50 2017
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.oak.fixture
 import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
 import org.apache.jackrabbit.oak.jcr.Jcr;
 import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,6 +43,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import static javax.jcr.observation.Event.NODE_ADDED;
 import static org.junit.Assert.assertTrue;
@@ -49,11 +51,14 @@ import static org.junit.Assert.assertTru
 @RunWith(Parameterized.class)
 public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
     static final int OBS_QUEUE_LENGTH = 5;
+    static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
 
     private static final String TEST_NODE = "test_node";
     private static final String TEST_NODE_TYPE = "oak:Unstructured";
     private static final String TEST_PATH = '/' + TEST_NODE;
 
+    private static final long CONDITION_TIMEOUT = 10*1000;
+
     private Session observingSession;
     private ObservationManager observationManager;
 
@@ -88,6 +93,7 @@ public class ObservationQueueFullWarnTes
     public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
         LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
                 .filter(Level.WARN)
+                .contains(OBS_QUEUE_FULL_WARN)
                 .create();
 
         final LoggingListener listener = new LoggingListener();
@@ -105,6 +111,88 @@ public class ObservationQueueFullWarnTes
         }
     }
 
+    @Test
+    public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+        LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+                .filter(Level.WARN)
+                .contains(OBS_QUEUE_FULL_WARN)
+                .create();
+        LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+                .filter(Level.DEBUG)
+                .contains(OBS_QUEUE_FULL_WARN)
+                .create();
+        LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+                .enable(Level.DEBUG)
+                .create();
+        logLevelSetting.starting();
+
+        long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL;
+        //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next.
+        ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10);
+
+        Clock oldClockInstance = ChangeProcessor.clock;
+        Clock virtualClock = new Clock.Virtual();
+        ChangeProcessor.clock = virtualClock;
+        virtualClock.waitUntil(System.currentTimeMillis());
+
+        final LoggingListener listener = new LoggingListener();
+        observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+        try {
+            Node n = getAdminSession().getNode(TEST_PATH);
+            int nodeNameCounter = 0;
+
+            //Create first level WARN message
+            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            emptyObsQueueABit(listener);
+
+            //Don't wait, fill up the queue again
+            warnLogs.starting();
+            debugLogs.starting();
+            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            assertTrue("Observation queue full warning must not logged until some time has past since last log",
+                    warnLogs.getLogs().size() == 0);
+            assertTrue("Observation queue full warning should get logged on debug though in the mean time",
+                    debugLogs.getLogs().size() > 0);
+            warnLogs.finished();
+            debugLogs.finished();
+            emptyObsQueueABit(listener);
+
+            //Wait some time so reach WARN level again
+            virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
+
+            warnLogs.starting();
+            debugLogs.starting();
+            addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            assertTrue("Observation queue full warning must get logged after some time has past since last log",
+                    warnLogs.getLogs().size() > 0);
+            warnLogs.finished();
+            debugLogs.finished();
+        }
+        finally {
+            observationManager.removeEventListener(listener);
+            ChangeProcessor.clock = oldClockInstance;
+            ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
+
+            logLevelSetting.finished();
+        }
+    }
+
+    private void emptyObsQueueABit(final LoggingListener listener) throws InterruptedException {
+        //Let queue empty up a bit.
+        boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new Condition() {
+            @Override
+            public boolean evaluate() {
+                return listener.numAdded >= 2;
+            }
+        });
+        listener.numAdded = 0;
+        assertTrue("Listener didn't process events within time-out", notTimedOut);
+    }
+
+    private interface Condition {
+        boolean evaluate();
+    }
+
     private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
             throws RepositoryException {
         listener.blockObservation.acquireUninterruptibly();
@@ -121,15 +209,34 @@ public class ObservationQueueFullWarnTes
 
     private class LoggingListener implements EventListener {
 
+        private volatile int numAdded = 0;
+
         Semaphore blockObservation = new Semaphore(1);
 
         @Override
-        public void onEvent(EventIterator events) {
+        public synchronized void onEvent(EventIterator events) {
             blockObservation.acquireUninterruptibly();
             while (events.hasNext()) {
                 events.nextEvent();
+                numAdded++;
             }
             blockObservation.release();
+
+            notifyAll();
+        }
+
+        synchronized boolean waitFor(long timeout, Condition c)
+                throws InterruptedException {
+            long end = System.currentTimeMillis() + timeout;
+            long remaining = end - System.currentTimeMillis();
+            while (remaining > 0) {
+                if (c.evaluate()) {
+                    return true;
+                }
+                wait(remaining);
+                remaining = end - System.currentTimeMillis();
+            }
+            return false;
         }
     }
 }