You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/02/15 16:36:31 UTC

svn commit: r1783121 - in /jackrabbit/oak/branches/1.6: ./ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Author: catholicon
Date: Wed Feb 15 16:36:31 2017
New Revision: 1783121

URL: http://svn.apache.org/viewvc?rev=1783121&view=rev
Log:
OAK-5626: ChangeProcessor doesn't reset 'blocking' flag when items from queue gets removed and commit-rate-limiter is null

(backport r1783066, r1783104 and r1783105 from trunk)

r1783066: Add a test to check that we log a warning when obseravation queue gets full. Test that subsequent re-fill and fix to come later.

r1783104: ChangeProcessor would now log WARN each time queue gets full. To avoid flooding of logs, consecutive WARNs would be avoided a breathing period (Default 10 minutes. Can be configured by JVM command line param: "oak.observation.full-queue.warn.interval"). During the breathing period, queue full logs would still be logged at DEBUG level.
Also, add test to check the behavior.

r1783105: Changed default warn interval to 30 minutes. Also update warn log to state interval in minutes.

Added:
    jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
      - copied, changed from r1783066, jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
Modified:
    jackrabbit/oak/branches/1.6/   (props changed)
    jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Propchange: jackrabbit/oak/branches/1.6/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 15 16:36:31 2017
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782990,1783061,1783089
+/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782990,1783061,1783066,1783089,1783104-1783105
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1783121&r1=1783120&r2=1783121&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Wed Feb 15 16:36:31 2017
@@ -62,6 +62,7 @@ import org.apache.jackrabbit.oak.spi.whi
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
 import org.apache.jackrabbit.oak.stats.TimerStats;
@@ -113,7 +114,19 @@ class ChangeProcessor implements Filteri
      * kicks in.
      */
     public static final int MAX_DELAY;
-    
+
+    //It'd would have been more useful to have following 2 properties as instance variables
+    //which got set by tests. But, the tests won't get a handle to the actual instance, so
+    //static-members it is.
+    /**
+     * Number of milliseconds to wait before issuing consecutive queue full warn messages
+     * Controlled by command line property "oak.observation.full-queue.warn.interval".
+     * Note, the command line parameter is wait interval in minutes.
+     */
+    static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
+            .getInteger("oak.observation.full-queue.warn.interval", 30));
+    static Clock clock = Clock.SIMPLE;
+
     // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
     static {
         final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -294,6 +307,8 @@ class ChangeProcessor implements Filteri
             private volatile long delay;
             private volatile boolean blocking;
 
+            private long lastQueueFullWarnTimestamp = -1;
+
             @Override
             protected void added(int newQueueSize) {
                 queueSizeChanged(newQueueSize);
@@ -310,11 +325,11 @@ class ChangeProcessor implements Filteri
                 if (newQueueSize >= queueLength) {
                     if (commitRateLimiter != null) {
                         if (!blocking) {
-                            LOG.warn("Revision queue is full. Further commits will be blocked.");
+                            logQueueFullWarning("Revision queue is full. Further commits will be blocked.");
                         }
                         commitRateLimiter.blockCommits();
                     } else if (!blocking) {
-                        LOG.warn("Revision queue is full. Further revisions will be compacted.");
+                        logQueueFullWarning("Revision queue is full. Further revisions will be compacted.");
                     }
                     blocking = true;
                 } else {
@@ -346,11 +361,24 @@ class ChangeProcessor implements Filteri
                                 commitRateLimiter.unblockCommits();
                                 blocking = false;
                             }
+                        } else {
+                            blocking = false;
                         }
                     }
                 }
             }
 
+            private void logQueueFullWarning(String message) {
+                long currTime = clock.getTime();
+                if (lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) {
+                    LOG.warn("{} Suppressing further such cases for {} minutes.",
+                            message,
+                            TimeUnit.MILLISECONDS.toMinutes(QUEUE_FULL_WARN_INTERVAL));
+                    lastQueueFullWarnTimestamp = currTime;
+                } else {
+                    LOG.debug(message);
+                }
+            }
             
             @Override
             public String toString() {

Copied: jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (from r1783066, jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?p2=jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java&p1=jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java&r1=1783066&r2=1783121&rev=1783121&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Wed Feb 15 16:36:31 2017
@@ -1,135 +1,243 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.jcr.observation;
-
-import ch.qos.logback.classic.Level;
-import org.apache.jackrabbit.api.JackrabbitRepository;
-import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
-import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
-import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
-import org.apache.jackrabbit.oak.jcr.Jcr;
-import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import javax.jcr.Node;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.SimpleCredentials;
-import javax.jcr.observation.EventIterator;
-import javax.jcr.observation.EventListener;
-import javax.jcr.observation.ObservationManager;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-
-import static javax.jcr.observation.Event.NODE_ADDED;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
-    static final int OBS_QUEUE_LENGTH = 5;
-
-    private static final String TEST_NODE = "test_node";
-    private static final String TEST_NODE_TYPE = "oak:Unstructured";
-    private static final String TEST_PATH = '/' + TEST_NODE;
-
-    private Session observingSession;
-    private ObservationManager observationManager;
-
-    public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
-        super(fixture);
-    }
-
-    @Override
-    protected Jcr initJcr(Jcr jcr) {
-        return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH);
-    }
-
-    @Before
-    public void setup() throws RepositoryException {
-        Session session = getAdminSession();
-
-        session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
-        session.save();
-
-        Map<String,Object> attrs = new HashMap<>();
-        attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
-        observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
-        observationManager = observingSession.getWorkspace().getObservationManager();
-    }
-
-    @After
-    public void tearDown() {
-        observingSession.logout();
-    }
-
-    @Test
-    public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
-        LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
-                .filter(Level.WARN)
-                .create();
-
-        final LoggingListener listener = new LoggingListener();
-        observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
-        try {
-            Node n = getAdminSession().getNode(TEST_PATH);
-
-            customLogs.starting();
-            addNodeToFillObsQueue(n, 0, listener);
-            assertTrue("Observation queue full warning must gets logged", customLogs.getLogs().size() > 0);
-            customLogs.finished();
-        }
-        finally {
-            observationManager.removeEventListener(listener);
-        }
-    }
-
-    private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
-            throws RepositoryException {
-        listener.blockObservation.acquireUninterruptibly();
-        try {
-            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
-                parent.addNode("n" + nodeNameCounter);
-                parent.getSession().save();
-            }
-            return nodeNameCounter;
-        } finally {
-            listener.blockObservation.release();
-        }
-    }
-
-    private class LoggingListener implements EventListener {
-
-        Semaphore blockObservation = new Semaphore(1);
-
-        @Override
-        public void onEvent(EventIterator events) {
-            blockObservation.acquireUninterruptibly();
-            while (events.hasNext()) {
-                events.nextEvent();
-            }
-            blockObservation.release();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation;
+
+import ch.qos.logback.classic.Level;
+import org.apache.jackrabbit.api.JackrabbitRepository;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
+import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.observation.ObservationManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static javax.jcr.observation.Event.NODE_ADDED;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
+    static final int OBS_QUEUE_LENGTH = 5;
+    static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
+
+    private static final String TEST_NODE = "test_node";
+    private static final String TEST_NODE_TYPE = "oak:Unstructured";
+    private static final String TEST_PATH = '/' + TEST_NODE;
+
+    private static final long CONDITION_TIMEOUT = 10*1000;
+
+    private Session observingSession;
+    private ObservationManager observationManager;
+
+    public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
+        super(fixture);
+    }
+
+    @Override
+    protected Jcr initJcr(Jcr jcr) {
+        return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH);
+    }
+
+    @Before
+    public void setup() throws RepositoryException {
+        Session session = getAdminSession();
+
+        session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
+        session.save();
+
+        Map<String,Object> attrs = new HashMap<>();
+        attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
+        observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
+        observationManager = observingSession.getWorkspace().getObservationManager();
+    }
+
+    @After
+    public void tearDown() {
+        observingSession.logout();
+    }
+
+    @Test
+    public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+        LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+                .filter(Level.WARN)
+                .contains(OBS_QUEUE_FULL_WARN)
+                .create();
+
+        final LoggingListener listener = new LoggingListener();
+        observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+        try {
+            Node n = getAdminSession().getNode(TEST_PATH);
+
+            customLogs.starting();
+            addNodeToFillObsQueue(n, 0, listener);
+            assertTrue("Observation queue full warning must gets logged", customLogs.getLogs().size() > 0);
+            customLogs.finished();
+        }
+        finally {
+            observationManager.removeEventListener(listener);
+        }
+    }
+
+    @Test
+    public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+        LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+                .filter(Level.WARN)
+                .contains(OBS_QUEUE_FULL_WARN)
+                .create();
+        LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+                .filter(Level.DEBUG)
+                .contains(OBS_QUEUE_FULL_WARN)
+                .create();
+        LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+                .enable(Level.DEBUG)
+                .create();
+        logLevelSetting.starting();
+
+        long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL;
+        //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next.
+        ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10);
+
+        Clock oldClockInstance = ChangeProcessor.clock;
+        Clock virtualClock = new Clock.Virtual();
+        ChangeProcessor.clock = virtualClock;
+        virtualClock.waitUntil(System.currentTimeMillis());
+
+        final LoggingListener listener = new LoggingListener();
+        observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+        try {
+            Node n = getAdminSession().getNode(TEST_PATH);
+            int nodeNameCounter = 0;
+
+            //Create first level WARN message
+            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            emptyObsQueueABit(listener);
+
+            //Don't wait, fill up the queue again
+            warnLogs.starting();
+            debugLogs.starting();
+            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            assertTrue("Observation queue full warning must not logged until some time has past since last log",
+                    warnLogs.getLogs().size() == 0);
+            assertTrue("Observation queue full warning should get logged on debug though in the mean time",
+                    debugLogs.getLogs().size() > 0);
+            warnLogs.finished();
+            debugLogs.finished();
+            emptyObsQueueABit(listener);
+
+            //Wait some time so reach WARN level again
+            virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
+
+            warnLogs.starting();
+            debugLogs.starting();
+            addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            assertTrue("Observation queue full warning must get logged after some time has past since last log",
+                    warnLogs.getLogs().size() > 0);
+            warnLogs.finished();
+            debugLogs.finished();
+        }
+        finally {
+            observationManager.removeEventListener(listener);
+            ChangeProcessor.clock = oldClockInstance;
+            ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
+
+            logLevelSetting.finished();
+        }
+    }
+
+    private void emptyObsQueueABit(final LoggingListener listener) throws InterruptedException {
+        //Let queue empty up a bit.
+        boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new Condition() {
+            @Override
+            public boolean evaluate() {
+                return listener.numAdded >= 2;
+            }
+        });
+        listener.numAdded = 0;
+        assertTrue("Listener didn't process events within time-out", notTimedOut);
+    }
+
+    private interface Condition {
+        boolean evaluate();
+    }
+
+    private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
+            throws RepositoryException {
+        listener.blockObservation.acquireUninterruptibly();
+        try {
+            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
+                parent.addNode("n" + nodeNameCounter);
+                parent.getSession().save();
+            }
+            return nodeNameCounter;
+        } finally {
+            listener.blockObservation.release();
+        }
+    }
+
+    private class LoggingListener implements EventListener {
+
+        private volatile int numAdded = 0;
+
+        Semaphore blockObservation = new Semaphore(1);
+
+        @Override
+        public synchronized void onEvent(EventIterator events) {
+            blockObservation.acquireUninterruptibly();
+            while (events.hasNext()) {
+                events.nextEvent();
+                numAdded++;
+            }
+            blockObservation.release();
+
+            notifyAll();
+        }
+
+        synchronized boolean waitFor(long timeout, Condition c)
+                throws InterruptedException {
+            long end = System.currentTimeMillis() + timeout;
+            long remaining = end - System.currentTimeMillis();
+            while (remaining > 0) {
+                if (c.evaluate()) {
+                    return true;
+                }
+                wait(remaining);
+                remaining = end - System.currentTimeMillis();
+            }
+            return false;
+        }
+    }
+}
+