You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/02/15 16:36:31 UTC
svn commit: r1783121 - in /jackrabbit/oak/branches/1.6: ./
oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
Author: catholicon
Date: Wed Feb 15 16:36:31 2017
New Revision: 1783121
URL: http://svn.apache.org/viewvc?rev=1783121&view=rev
Log:
OAK-5626: ChangeProcessor doesn't reset 'blocking' flag when items from queue gets removed and commit-rate-limiter is null
(backport r1783066, r1783104 and r1783105 from trunk)
r1783066: Add a test to check that we log a warning when obseravation queue gets full. Test that subsequent re-fill and fix to come later.
r1783104: ChangeProcessor would now log WARN each time queue gets full. To avoid flooding of logs, consecutive WARNs would be avoided a breathing period (Default 10 minutes. Can be configured by JVM command line param: "oak.observation.full-queue.warn.interval"). During the breathing period, queue full logs would still be logged at DEBUG level.
Also, add test to check the behavior.
r1783105: Changed default warn interval to 30 minutes. Also update warn log to state interval in minutes.
Added:
jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
- copied, changed from r1783066, jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
Modified:
jackrabbit/oak/branches/1.6/ (props changed)
jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Propchange: jackrabbit/oak/branches/1.6/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 15 16:36:31 2017
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782990,1783061,1783089
+/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782990,1783061,1783066,1783089,1783104-1783105
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1783121&r1=1783120&r2=1783121&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/branches/1.6/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Wed Feb 15 16:36:31 2017
@@ -62,6 +62,7 @@ import org.apache.jackrabbit.oak.spi.whi
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.stats.TimerStats;
@@ -113,7 +114,19 @@ class ChangeProcessor implements Filteri
* kicks in.
*/
public static final int MAX_DELAY;
-
+
+ //It'd would have been more useful to have following 2 properties as instance variables
+ //which got set by tests. But, the tests won't get a handle to the actual instance, so
+ //static-members it is.
+ /**
+ * Number of milliseconds to wait before issuing consecutive queue full warn messages
+ * Controlled by command line property "oak.observation.full-queue.warn.interval".
+ * Note, the command line parameter is wait interval in minutes.
+ */
+ static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
+ .getInteger("oak.observation.full-queue.warn.interval", 30));
+ static Clock clock = Clock.SIMPLE;
+
// OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
static {
final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -294,6 +307,8 @@ class ChangeProcessor implements Filteri
private volatile long delay;
private volatile boolean blocking;
+ private long lastQueueFullWarnTimestamp = -1;
+
@Override
protected void added(int newQueueSize) {
queueSizeChanged(newQueueSize);
@@ -310,11 +325,11 @@ class ChangeProcessor implements Filteri
if (newQueueSize >= queueLength) {
if (commitRateLimiter != null) {
if (!blocking) {
- LOG.warn("Revision queue is full. Further commits will be blocked.");
+ logQueueFullWarning("Revision queue is full. Further commits will be blocked.");
}
commitRateLimiter.blockCommits();
} else if (!blocking) {
- LOG.warn("Revision queue is full. Further revisions will be compacted.");
+ logQueueFullWarning("Revision queue is full. Further revisions will be compacted.");
}
blocking = true;
} else {
@@ -346,11 +361,24 @@ class ChangeProcessor implements Filteri
commitRateLimiter.unblockCommits();
blocking = false;
}
+ } else {
+ blocking = false;
}
}
}
}
+ private void logQueueFullWarning(String message) {
+ long currTime = clock.getTime();
+ if (lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) {
+ LOG.warn("{} Suppressing further such cases for {} minutes.",
+ message,
+ TimeUnit.MILLISECONDS.toMinutes(QUEUE_FULL_WARN_INTERVAL));
+ lastQueueFullWarnTimestamp = currTime;
+ } else {
+ LOG.debug(message);
+ }
+ }
@Override
public String toString() {
Copied: jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (from r1783066, jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?p2=jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java&p1=jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java&r1=1783066&r2=1783121&rev=1783121&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Wed Feb 15 16:36:31 2017
@@ -1,135 +1,243 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.jcr.observation;
-
-import ch.qos.logback.classic.Level;
-import org.apache.jackrabbit.api.JackrabbitRepository;
-import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
-import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
-import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
-import org.apache.jackrabbit.oak.jcr.Jcr;
-import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import javax.jcr.Node;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.SimpleCredentials;
-import javax.jcr.observation.EventIterator;
-import javax.jcr.observation.EventListener;
-import javax.jcr.observation.ObservationManager;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-
-import static javax.jcr.observation.Event.NODE_ADDED;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
- static final int OBS_QUEUE_LENGTH = 5;
-
- private static final String TEST_NODE = "test_node";
- private static final String TEST_NODE_TYPE = "oak:Unstructured";
- private static final String TEST_PATH = '/' + TEST_NODE;
-
- private Session observingSession;
- private ObservationManager observationManager;
-
- public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
- super(fixture);
- }
-
- @Override
- protected Jcr initJcr(Jcr jcr) {
- return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH);
- }
-
- @Before
- public void setup() throws RepositoryException {
- Session session = getAdminSession();
-
- session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
- session.save();
-
- Map<String,Object> attrs = new HashMap<>();
- attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
- observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
- observationManager = observingSession.getWorkspace().getObservationManager();
- }
-
- @After
- public void tearDown() {
- observingSession.logout();
- }
-
- @Test
- public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
- LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
- .filter(Level.WARN)
- .create();
-
- final LoggingListener listener = new LoggingListener();
- observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
- try {
- Node n = getAdminSession().getNode(TEST_PATH);
-
- customLogs.starting();
- addNodeToFillObsQueue(n, 0, listener);
- assertTrue("Observation queue full warning must gets logged", customLogs.getLogs().size() > 0);
- customLogs.finished();
- }
- finally {
- observationManager.removeEventListener(listener);
- }
- }
-
- private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
- throws RepositoryException {
- listener.blockObservation.acquireUninterruptibly();
- try {
- for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
- parent.addNode("n" + nodeNameCounter);
- parent.getSession().save();
- }
- return nodeNameCounter;
- } finally {
- listener.blockObservation.release();
- }
- }
-
- private class LoggingListener implements EventListener {
-
- Semaphore blockObservation = new Semaphore(1);
-
- @Override
- public void onEvent(EventIterator events) {
- blockObservation.acquireUninterruptibly();
- while (events.hasNext()) {
- events.nextEvent();
- }
- blockObservation.release();
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation;
+
+import ch.qos.logback.classic.Level;
+import org.apache.jackrabbit.api.JackrabbitRepository;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
+import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.observation.ObservationManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static javax.jcr.observation.Event.NODE_ADDED;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
+ static final int OBS_QUEUE_LENGTH = 5;
+ static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
+
+ private static final String TEST_NODE = "test_node";
+ private static final String TEST_NODE_TYPE = "oak:Unstructured";
+ private static final String TEST_PATH = '/' + TEST_NODE;
+
+ private static final long CONDITION_TIMEOUT = 10*1000;
+
+ private Session observingSession;
+ private ObservationManager observationManager;
+
+ public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
+ super(fixture);
+ }
+
+ @Override
+ protected Jcr initJcr(Jcr jcr) {
+ return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH);
+ }
+
+ @Before
+ public void setup() throws RepositoryException {
+ Session session = getAdminSession();
+
+ session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
+ session.save();
+
+ Map<String,Object> attrs = new HashMap<>();
+ attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
+ observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
+ observationManager = observingSession.getWorkspace().getObservationManager();
+ }
+
+ @After
+ public void tearDown() {
+ observingSession.logout();
+ }
+
+ @Test
+ public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+ LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.WARN)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+
+ final LoggingListener listener = new LoggingListener();
+ observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+ try {
+ Node n = getAdminSession().getNode(TEST_PATH);
+
+ customLogs.starting();
+ addNodeToFillObsQueue(n, 0, listener);
+ assertTrue("Observation queue full warning must gets logged", customLogs.getLogs().size() > 0);
+ customLogs.finished();
+ }
+ finally {
+ observationManager.removeEventListener(listener);
+ }
+ }
+
+ @Test
+ public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+ LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.WARN)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+ LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.DEBUG)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+ LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .enable(Level.DEBUG)
+ .create();
+ logLevelSetting.starting();
+
+ long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL;
+ //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next.
+ ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10);
+
+ Clock oldClockInstance = ChangeProcessor.clock;
+ Clock virtualClock = new Clock.Virtual();
+ ChangeProcessor.clock = virtualClock;
+ virtualClock.waitUntil(System.currentTimeMillis());
+
+ final LoggingListener listener = new LoggingListener();
+ observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+ try {
+ Node n = getAdminSession().getNode(TEST_PATH);
+ int nodeNameCounter = 0;
+
+ //Create first level WARN message
+ nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ emptyObsQueueABit(listener);
+
+ //Don't wait, fill up the queue again
+ warnLogs.starting();
+ debugLogs.starting();
+ nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ assertTrue("Observation queue full warning must not logged until some time has past since last log",
+ warnLogs.getLogs().size() == 0);
+ assertTrue("Observation queue full warning should get logged on debug though in the mean time",
+ debugLogs.getLogs().size() > 0);
+ warnLogs.finished();
+ debugLogs.finished();
+ emptyObsQueueABit(listener);
+
+ //Wait some time so reach WARN level again
+ virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
+
+ warnLogs.starting();
+ debugLogs.starting();
+ addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ assertTrue("Observation queue full warning must get logged after some time has past since last log",
+ warnLogs.getLogs().size() > 0);
+ warnLogs.finished();
+ debugLogs.finished();
+ }
+ finally {
+ observationManager.removeEventListener(listener);
+ ChangeProcessor.clock = oldClockInstance;
+ ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
+
+ logLevelSetting.finished();
+ }
+ }
+
+ private void emptyObsQueueABit(final LoggingListener listener) throws InterruptedException {
+ //Let queue empty up a bit.
+ boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new Condition() {
+ @Override
+ public boolean evaluate() {
+ return listener.numAdded >= 2;
+ }
+ });
+ listener.numAdded = 0;
+ assertTrue("Listener didn't process events within time-out", notTimedOut);
+ }
+
+ private interface Condition {
+ boolean evaluate();
+ }
+
+ private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
+ throws RepositoryException {
+ listener.blockObservation.acquireUninterruptibly();
+ try {
+ for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
+ parent.addNode("n" + nodeNameCounter);
+ parent.getSession().save();
+ }
+ return nodeNameCounter;
+ } finally {
+ listener.blockObservation.release();
+ }
+ }
+
+ private class LoggingListener implements EventListener {
+
+ private volatile int numAdded = 0;
+
+ Semaphore blockObservation = new Semaphore(1);
+
+ @Override
+ public synchronized void onEvent(EventIterator events) {
+ blockObservation.acquireUninterruptibly();
+ while (events.hasNext()) {
+ events.nextEvent();
+ numAdded++;
+ }
+ blockObservation.release();
+
+ notifyAll();
+ }
+
+ synchronized boolean waitFor(long timeout, Condition c)
+ throws InterruptedException {
+ long end = System.currentTimeMillis() + timeout;
+ long remaining = end - System.currentTimeMillis();
+ while (remaining > 0) {
+ if (c.evaluate()) {
+ return true;
+ }
+ wait(remaining);
+ remaining = end - System.currentTimeMillis();
+ }
+ return false;
+ }
+ }
+}
+