You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/02/19 10:41:22 UTC
svn commit: r1783621 - in /jackrabbit/oak/branches/1.6: ./
oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
Author: catholicon
Date: Sun Feb 19 10:41:21 2017
New Revision: 1783621
URL: http://svn.apache.org/viewvc?rev=1783621&view=rev
Log:
OAK-5668: Test failure: observation.ObservationQueueFullWarnTest.warnOnRepeatedQueueFull (backport r1783619 from trunk)
Test added in OAK-5626 failed intermittently. The reason was that the test's assumption of emptying up observation queue when it wished wasn't accurate.
Refactored the test.
Modified:
jackrabbit/oak/branches/1.6/ (props changed)
jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
Propchange: jackrabbit/oak/branches/1.6/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Feb 19 10:41:21 2017
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782973,1782990,1783061,1783066,1783089,1783104-1783105
+/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782973,1782990,1783061,1783066,1783089,1783104-1783105,1783619
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?rev=1783621&r1=1783620&r2=1783621&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Sun Feb 19 10:41:21 2017
@@ -31,11 +31,14 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import javax.jcr.observation.ObservationManager;
@@ -44,26 +47,38 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static javax.jcr.observation.Event.NODE_ADDED;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
- static final int OBS_QUEUE_LENGTH = 5;
- static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
+ private static final int OBS_QUEUE_LENGTH = 5;
+ private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
private static final String TEST_NODE = "test_node";
private static final String TEST_NODE_TYPE = "oak:Unstructured";
private static final String TEST_PATH = '/' + TEST_NODE;
- private static final long CONDITION_TIMEOUT = 10*1000;
+ private static final long OBS_TIMEOUT_PER_ITEM = 1000;
+ private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * OBS_TIMEOUT_PER_ITEM;
private Session observingSession;
private ObservationManager observationManager;
+ private final BlockableListener listener = new BlockableListener();
+
+ private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class);
+
+ private final Semaphore blockObservation = new Semaphore(1);
+
+ private final AtomicInteger numAddedNodes = new AtomicInteger(0);
+ private final AtomicInteger numObservedNodes = new AtomicInteger(0);
+
public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
super(fixture);
+ LOG.info("fixture: {}", fixture);
}
@Override
@@ -78,7 +93,7 @@ public class ObservationQueueFullWarnTes
session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
session.save();
- Map<String,Object> attrs = new HashMap<>();
+ Map<String, Object> attrs = new HashMap<>();
attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
observationManager = observingSession.getWorkspace().getObservationManager();
@@ -96,17 +111,13 @@ public class ObservationQueueFullWarnTes
.contains(OBS_QUEUE_FULL_WARN)
.create();
- final LoggingListener listener = new LoggingListener();
observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
try {
- Node n = getAdminSession().getNode(TEST_PATH);
-
customLogs.starting();
- addNodeToFillObsQueue(n, 0, listener);
- assertTrue("Observation queue full warning must gets logged", customLogs.getLogs().size() > 0);
+ addNodeToFillObsQueue();
+ assertTrue("Observation queue full warning must get logged", customLogs.getLogs().size() > 0);
customLogs.finished();
- }
- finally {
+ } finally {
observationManager.removeEventListener(listener);
}
}
@@ -135,40 +146,35 @@ public class ObservationQueueFullWarnTes
ChangeProcessor.clock = virtualClock;
virtualClock.waitUntil(System.currentTimeMillis());
- final LoggingListener listener = new LoggingListener();
observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
try {
- Node n = getAdminSession().getNode(TEST_PATH);
- int nodeNameCounter = 0;
-
//Create first level WARN message
- nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
- emptyObsQueueABit(listener);
+ addNodeToFillObsQueue();
+ emptyObsQueue();
//Don't wait, fill up the queue again
warnLogs.starting();
debugLogs.starting();
- nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ addNodeToFillObsQueue();
assertTrue("Observation queue full warning must not logged until some time has past since last log",
warnLogs.getLogs().size() == 0);
assertTrue("Observation queue full warning should get logged on debug though in the mean time",
debugLogs.getLogs().size() > 0);
warnLogs.finished();
debugLogs.finished();
- emptyObsQueueABit(listener);
+ emptyObsQueue();
//Wait some time so reach WARN level again
virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
warnLogs.starting();
debugLogs.starting();
- addNodeToFillObsQueue(n, nodeNameCounter, listener);
+ addNodeToFillObsQueue();
assertTrue("Observation queue full warning must get logged after some time has past since last log",
warnLogs.getLogs().size() > 0);
warnLogs.finished();
debugLogs.finished();
- }
- finally {
+ } finally {
observationManager.removeEventListener(listener);
ChangeProcessor.clock = oldClockInstance;
ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
@@ -177,67 +183,79 @@ public class ObservationQueueFullWarnTes
}
}
- private void emptyObsQueueABit(final LoggingListener listener) throws InterruptedException {
- //Let queue empty up a bit.
- boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new Condition() {
- @Override
- public boolean evaluate() {
- return listener.numAdded >= 2;
- }
- });
- listener.numAdded = 0;
- assertTrue("Listener didn't process events within time-out", notTimedOut);
- }
-
- private interface Condition {
- boolean evaluate();
+ private void addANode(String prefix) throws RepositoryException {
+ Session session = getAdminSession();
+ Node parent = session.getNode(TEST_PATH);
+ String nodeName = prefix + numAddedNodes.get();
+ parent.addNode(nodeName);
+ session.save();
+ numAddedNodes.incrementAndGet();
}
- private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
+ private void addNodeToFillObsQueue()
throws RepositoryException {
- listener.blockObservation.acquireUninterruptibly();
+ blockObservation.acquireUninterruptibly();
try {
- for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
- parent.addNode("n" + nodeNameCounter);
- parent.getSession().save();
+ for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
+ addANode("n");
}
- return nodeNameCounter;
} finally {
- listener.blockObservation.release();
+ blockObservation.release();
}
}
- private class LoggingListener implements EventListener {
+ private interface Condition {
+ boolean evaluate();
+ }
- private volatile int numAdded = 0;
+ private boolean waitFor(long timeout, Condition c)
+ throws InterruptedException {
+ long end = System.currentTimeMillis() + timeout;
+ long remaining = end - System.currentTimeMillis();
+ while (remaining > 0) {
+ if (c.evaluate()) {
+ return true;
+ }
- Semaphore blockObservation = new Semaphore(1);
+ //Add another node only when num_pending_to_be_observed nodes is
+ //less that observation queue. This is done to let all observation finish
+ //up in case last few event were dropped due to full observation queue
+ //(which is ok as the next event that comes in gets diff-ed with last
+ //processed revision)
+ if (numAddedNodes.get() < numObservedNodes.get() + OBS_QUEUE_LENGTH) {
+ try {
+ addANode("addedWhileWaiting");
+ } catch (RepositoryException e) {
+ LOG.warn("exception while adding during wait: {}", e);
+ }
+ }
+ Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated
+ remaining = end - System.currentTimeMillis();
+ }
+ return c.evaluate();
+ }
+
+ private void emptyObsQueue() throws InterruptedException {
+ boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() {
+ @Override
+ public boolean evaluate() {
+ return numObservedNodes.get()==numAddedNodes.get();
+ }
+ });
+ assertTrue("Listener didn't process events within time-out", notTimedOut);
+ }
+ private class BlockableListener implements EventListener {
@Override
- public synchronized void onEvent(EventIterator events) {
+ public void onEvent(EventIterator events) {
blockObservation.acquireUninterruptibly();
while (events.hasNext()) {
- events.nextEvent();
- numAdded++;
- }
- blockObservation.release();
-
- notifyAll();
- }
-
- synchronized boolean waitFor(long timeout, Condition c)
- throws InterruptedException {
- long end = System.currentTimeMillis() + timeout;
- long remaining = end - System.currentTimeMillis();
- while (remaining > 0) {
- if (c.evaluate()) {
- return true;
+ Event event = events.nextEvent();
+ if (event.getType() == Event.NODE_ADDED) {
+ numObservedNodes.incrementAndGet();
}
- wait(remaining);
- remaining = end - System.currentTimeMillis();
}
- return false;
+ blockObservation.release();
}
}
}
-