You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/02/15 13:52:50 UTC
svn commit: r1783104 - in /jackrabbit/oak/trunk/oak-jcr/src:
main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
Author: catholicon
Date: Wed Feb 15 13:52:50 2017
New Revision: 1783104
URL: http://svn.apache.org/viewvc?rev=1783104&view=rev
Log:
OAK-5626: ChangeProcessor doesn't reset 'blocking' flag when items from queue gets removed and commit-rate-limiter is null
ChangeProcessor would now log WARN each time queue gets full. To avoid flooding of logs, consecutive WARNs would be avoided a breathing period (Default 10 minutes. Can be configured by JVM command line param: "oak.observation.full-queue.warn.interval"). During the breathing period, queue full logs would still be logged at DEBUG level.
Also, add test to check the behavior.
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1783104&r1=1783103&r2=1783104&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Wed Feb 15 13:52:50 2017
@@ -62,6 +62,7 @@ import org.apache.jackrabbit.oak.spi.whi
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.stats.TimerStats;
@@ -113,7 +114,19 @@ class ChangeProcessor implements Filteri
* kicks in.
*/
public static final int MAX_DELAY;
-
+
+ //It'd would have been more useful to have following 2 properties as instance variables
+ //which got set by tests. But, the tests won't get a handle to the actual instance, so
+ //static-members it is.
+ /**
+ * Number of milliseconds to wait before issuing consecutive queue full warn messages
+ * Controlled by command line property "oak.observation.full-queue.warn.interval".
+ * Note, the command line parameter is wait interval in minutes.
+ */
+ static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
+ .getInteger("oak.observation.full-queue.warn.interval", 10));
+ static Clock clock = Clock.SIMPLE;
+
// OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
static {
final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -294,6 +307,8 @@ class ChangeProcessor implements Filteri
private volatile long delay;
private volatile boolean blocking;
+ private long lastQueueFullWarnTimestamp = -1;
+
@Override
protected void added(int newQueueSize) {
queueSizeChanged(newQueueSize);
@@ -310,11 +325,11 @@ class ChangeProcessor implements Filteri
if (newQueueSize >= queueLength) {
if (commitRateLimiter != null) {
if (!blocking) {
- LOG.warn("Revision queue is full. Further commits will be blocked.");
+ logQueueFullWarning("Revision queue is full. Further commits will be blocked.");
}
commitRateLimiter.blockCommits();
} else if (!blocking) {
- LOG.warn("Revision queue is full. Further revisions will be compacted.");
+ logQueueFullWarning("Revision queue is full. Further revisions will be compacted.");
}
blocking = true;
} else {
@@ -346,11 +361,24 @@ class ChangeProcessor implements Filteri
commitRateLimiter.unblockCommits();
blocking = false;
}
+ } else {
+ blocking = false;
}
}
}
}
+ private void logQueueFullWarning(String message) {
+ long currTime = clock.getTime();
+ if (lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) {
+ LOG.warn("{} Suppressing further such cases for {} seconds.",
+ message,
+ TimeUnit.MILLISECONDS.toSeconds(QUEUE_FULL_WARN_INTERVAL));
+ lastQueueFullWarnTimestamp = currTime;
+ } else {
+ LOG.debug(message);
+ }
+ }
@Override
public String toString() {
Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?rev=1783104&r1=1783103&r2=1783104&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Wed Feb 15 13:52:50 2017
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.oak.fixture
import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,6 +43,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import static javax.jcr.observation.Event.NODE_ADDED;
import static org.junit.Assert.assertTrue;
@@ -49,11 +51,14 @@ import static org.junit.Assert.assertTru
@RunWith(Parameterized.class)
public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
static final int OBS_QUEUE_LENGTH = 5;
+ static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
private static final String TEST_NODE = "test_node";
private static final String TEST_NODE_TYPE = "oak:Unstructured";
private static final String TEST_PATH = '/' + TEST_NODE;
+ private static final long CONDITION_TIMEOUT = 10*1000;
+
private Session observingSession;
private ObservationManager observationManager;
@@ -88,6 +93,7 @@ public class ObservationQueueFullWarnTes
public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
.filter(Level.WARN)
+ .contains(OBS_QUEUE_FULL_WARN)
.create();
final LoggingListener listener = new LoggingListener();
@@ -105,6 +111,88 @@ public class ObservationQueueFullWarnTes
}
}
+ @Test
+ public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+ LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.WARN)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+ LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.DEBUG)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+ LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .enable(Level.DEBUG)
+ .create();
+ logLevelSetting.starting();
+
+ long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL;
+ //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next.
+ ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10);
+
+ Clock oldClockInstance = ChangeProcessor.clock;
+ Clock virtualClock = new Clock.Virtual();
+ ChangeProcessor.clock = virtualClock;
+ virtualClock.waitUntil(System.currentTimeMillis());
+
+ final LoggingListener listener = new LoggingListener();
+ observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+ try {
+ Node n = getAdminSession().getNode(TEST_PATH);
+ int nodeNameCounter = 0;
+
+ //Create first level WARN message
+ nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ emptyObsQueueABit(listener);
+
+ //Don't wait, fill up the queue again
+ warnLogs.starting();
+ debugLogs.starting();
+ nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ assertTrue("Observation queue full warning must not logged until some time has past since last log",
+ warnLogs.getLogs().size() == 0);
+ assertTrue("Observation queue full warning should get logged on debug though in the mean time",
+ debugLogs.getLogs().size() > 0);
+ warnLogs.finished();
+ debugLogs.finished();
+ emptyObsQueueABit(listener);
+
+ //Wait some time so reach WARN level again
+ virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
+
+ warnLogs.starting();
+ debugLogs.starting();
+ addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ assertTrue("Observation queue full warning must get logged after some time has past since last log",
+ warnLogs.getLogs().size() > 0);
+ warnLogs.finished();
+ debugLogs.finished();
+ }
+ finally {
+ observationManager.removeEventListener(listener);
+ ChangeProcessor.clock = oldClockInstance;
+ ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
+
+ logLevelSetting.finished();
+ }
+ }
+
+ private void emptyObsQueueABit(final LoggingListener listener) throws InterruptedException {
+ //Let queue empty up a bit.
+ boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new Condition() {
+ @Override
+ public boolean evaluate() {
+ return listener.numAdded >= 2;
+ }
+ });
+ listener.numAdded = 0;
+ assertTrue("Listener didn't process events within time-out", notTimedOut);
+ }
+
+ private interface Condition {
+ boolean evaluate();
+ }
+
private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
throws RepositoryException {
listener.blockObservation.acquireUninterruptibly();
@@ -121,15 +209,34 @@ public class ObservationQueueFullWarnTes
private class LoggingListener implements EventListener {
+ private volatile int numAdded = 0;
+
Semaphore blockObservation = new Semaphore(1);
@Override
- public void onEvent(EventIterator events) {
+ public synchronized void onEvent(EventIterator events) {
blockObservation.acquireUninterruptibly();
while (events.hasNext()) {
events.nextEvent();
+ numAdded++;
}
blockObservation.release();
+
+ notifyAll();
+ }
+
+ synchronized boolean waitFor(long timeout, Condition c)
+ throws InterruptedException {
+ long end = System.currentTimeMillis() + timeout;
+ long remaining = end - System.currentTimeMillis();
+ while (remaining > 0) {
+ if (c.evaluate()) {
+ return true;
+ }
+ wait(remaining);
+ remaining = end - System.currentTimeMillis();
+ }
+ return false;
}
}
}