You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/02/19 10:41:22 UTC

svn commit: r1783621 - in /jackrabbit/oak/branches/1.6: ./ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Author: catholicon
Date: Sun Feb 19 10:41:21 2017
New Revision: 1783621

URL: http://svn.apache.org/viewvc?rev=1783621&view=rev
Log:
OAK-5668: Test failure: observation.ObservationQueueFullWarnTest.warnOnRepeatedQueueFull (backport r1783619 from trunk)

Test added in OAK-5626 failed intermittently. The reason was that the test's assumption of emptying up observation queue when it wished wasn't accurate.
Refactored the test.


Modified:
    jackrabbit/oak/branches/1.6/   (props changed)
    jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Propchange: jackrabbit/oak/branches/1.6/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Feb 19 10:41:21 2017
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782973,1782990,1783061,1783066,1783089,1783104-1783105
+/jackrabbit/oak/trunk:1781068,1781075,1781248,1781386,1781846,1781907,1782000,1782029,1782196,1782447,1782770,1782945,1782973,1782990,1783061,1783066,1783089,1783104-1783105,1783619
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?rev=1783621&r1=1783620&r2=1783621&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/branches/1.6/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Sun Feb 19 10:41:21 2017
@@ -31,11 +31,14 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jcr.Node;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.Event;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.observation.EventListener;
 import javax.jcr.observation.ObservationManager;
@@ -44,26 +47,38 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static javax.jcr.observation.Event.NODE_ADDED;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
 public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
-    static final int OBS_QUEUE_LENGTH = 5;
-    static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
+    private static final int OBS_QUEUE_LENGTH = 5;
+    private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
 
     private static final String TEST_NODE = "test_node";
     private static final String TEST_NODE_TYPE = "oak:Unstructured";
     private static final String TEST_PATH = '/' + TEST_NODE;
 
-    private static final long CONDITION_TIMEOUT = 10*1000;
+    private static final long OBS_TIMEOUT_PER_ITEM = 1000;
+    private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * OBS_TIMEOUT_PER_ITEM;
 
     private Session observingSession;
     private ObservationManager observationManager;
 
+    private final BlockableListener listener = new BlockableListener();
+
+    private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class);
+
+    private final Semaphore blockObservation = new Semaphore(1);
+
+    private final AtomicInteger numAddedNodes = new AtomicInteger(0);
+    private final AtomicInteger numObservedNodes = new AtomicInteger(0);
+
     public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
         super(fixture);
+        LOG.info("fixture: {}", fixture);
     }
 
     @Override
@@ -78,7 +93,7 @@ public class ObservationQueueFullWarnTes
         session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
         session.save();
 
-        Map<String,Object> attrs = new HashMap<>();
+        Map<String, Object> attrs = new HashMap<>();
         attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
         observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
         observationManager = observingSession.getWorkspace().getObservationManager();
@@ -96,17 +111,13 @@ public class ObservationQueueFullWarnTes
                 .contains(OBS_QUEUE_FULL_WARN)
                 .create();
 
-        final LoggingListener listener = new LoggingListener();
         observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
         try {
-            Node n = getAdminSession().getNode(TEST_PATH);
-
             customLogs.starting();
-            addNodeToFillObsQueue(n, 0, listener);
-            assertTrue("Observation queue full warning must gets logged", customLogs.getLogs().size() > 0);
+            addNodeToFillObsQueue();
+            assertTrue("Observation queue full warning must get logged", customLogs.getLogs().size() > 0);
             customLogs.finished();
-        }
-        finally {
+        } finally {
             observationManager.removeEventListener(listener);
         }
     }
@@ -135,40 +146,35 @@ public class ObservationQueueFullWarnTes
         ChangeProcessor.clock = virtualClock;
         virtualClock.waitUntil(System.currentTimeMillis());
 
-        final LoggingListener listener = new LoggingListener();
         observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
         try {
-            Node n = getAdminSession().getNode(TEST_PATH);
-            int nodeNameCounter = 0;
-
             //Create first level WARN message
-            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
-            emptyObsQueueABit(listener);
+            addNodeToFillObsQueue();
+            emptyObsQueue();
 
             //Don't wait, fill up the queue again
             warnLogs.starting();
             debugLogs.starting();
-            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            addNodeToFillObsQueue();
             assertTrue("Observation queue full warning must not logged until some time has past since last log",
                     warnLogs.getLogs().size() == 0);
             assertTrue("Observation queue full warning should get logged on debug though in the mean time",
                     debugLogs.getLogs().size() > 0);
             warnLogs.finished();
             debugLogs.finished();
-            emptyObsQueueABit(listener);
+            emptyObsQueue();
 
             //Wait some time so reach WARN level again
             virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
 
             warnLogs.starting();
             debugLogs.starting();
-            addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            addNodeToFillObsQueue();
             assertTrue("Observation queue full warning must get logged after some time has past since last log",
                     warnLogs.getLogs().size() > 0);
             warnLogs.finished();
             debugLogs.finished();
-        }
-        finally {
+        } finally {
             observationManager.removeEventListener(listener);
             ChangeProcessor.clock = oldClockInstance;
             ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
@@ -177,67 +183,79 @@ public class ObservationQueueFullWarnTes
         }
     }
 
-    private void emptyObsQueueABit(final LoggingListener listener) throws InterruptedException {
-        //Let queue empty up a bit.
-        boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new Condition() {
-            @Override
-            public boolean evaluate() {
-                return listener.numAdded >= 2;
-            }
-        });
-        listener.numAdded = 0;
-        assertTrue("Listener didn't process events within time-out", notTimedOut);
-    }
-
-    private interface Condition {
-        boolean evaluate();
+    private void addANode(String prefix) throws RepositoryException {
+        Session session = getAdminSession();
+        Node parent = session.getNode(TEST_PATH);
+        String nodeName = prefix + numAddedNodes.get();
+        parent.addNode(nodeName);
+        session.save();
+        numAddedNodes.incrementAndGet();
     }
 
-    private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
+    private void addNodeToFillObsQueue()
             throws RepositoryException {
-        listener.blockObservation.acquireUninterruptibly();
+        blockObservation.acquireUninterruptibly();
         try {
-            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
-                parent.addNode("n" + nodeNameCounter);
-                parent.getSession().save();
+            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
+                addANode("n");
             }
-            return nodeNameCounter;
         } finally {
-            listener.blockObservation.release();
+            blockObservation.release();
         }
     }
 
-    private class LoggingListener implements EventListener {
+    private interface Condition {
+        boolean evaluate();
+    }
 
-        private volatile int numAdded = 0;
+    private boolean waitFor(long timeout, Condition c)
+            throws InterruptedException {
+        long end = System.currentTimeMillis() + timeout;
+        long remaining = end - System.currentTimeMillis();
+        while (remaining > 0) {
+            if (c.evaluate()) {
+                return true;
+            }
 
-        Semaphore blockObservation = new Semaphore(1);
+            //Add another node only when num_pending_to_be_observed nodes is
+            //less that observation queue. This is done to let all observation finish
+            //up in case last few event were dropped due to full observation queue
+            //(which is ok as the next event that comes in gets diff-ed with last
+            //processed revision)
+            if (numAddedNodes.get() < numObservedNodes.get() + OBS_QUEUE_LENGTH) {
+                try {
+                    addANode("addedWhileWaiting");
+                } catch (RepositoryException e) {
+                    LOG.warn("exception while adding during wait: {}", e);
+                }
+            }
+            Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated
+            remaining = end - System.currentTimeMillis();
+        }
+        return c.evaluate();
+    }
+
+    private void emptyObsQueue() throws InterruptedException {
+        boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() {
+            @Override
+            public boolean evaluate() {
+                return numObservedNodes.get()==numAddedNodes.get();
+            }
+        });
+        assertTrue("Listener didn't process events within time-out", notTimedOut);
+    }
 
+    private class BlockableListener implements EventListener {
         @Override
-        public synchronized void onEvent(EventIterator events) {
+        public void onEvent(EventIterator events) {
             blockObservation.acquireUninterruptibly();
             while (events.hasNext()) {
-                events.nextEvent();
-                numAdded++;
-            }
-            blockObservation.release();
-
-            notifyAll();
-        }
-
-        synchronized boolean waitFor(long timeout, Condition c)
-                throws InterruptedException {
-            long end = System.currentTimeMillis() + timeout;
-            long remaining = end - System.currentTimeMillis();
-            while (remaining > 0) {
-                if (c.evaluate()) {
-                    return true;
+                Event event = events.nextEvent();
+                if (event.getType() == Event.NODE_ADDED) {
+                    numObservedNodes.incrementAndGet();
                 }
-                wait(remaining);
-                remaining = end - System.currentTimeMillis();
             }
-            return false;
+            blockObservation.release();
         }
     }
 }
-