You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/02/19 11:09:46 UTC

svn commit: r1783625 - in /jackrabbit/oak/branches/1.2: ./ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Author: catholicon
Date: Sun Feb 19 11:09:46 2017
New Revision: 1783625

URL: http://svn.apache.org/viewvc?rev=1783625&view=rev
Log:
OAK-5668: Test failure: observation.ObservationQueueFullWarnTest.warnOnRepeatedQueueFull (backport r1783619 from trunk)

Test added in OAK-5626 failed intermittently. The reason was that the test's assumption of emptying up observation queue when it wished wasn't accurate.
Refactored the test.


Modified:
    jackrabbit/oak/branches/1.2/   (props changed)
    jackrabbit/oak/branches/1.2/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Feb 19 11:09:46 2017
@@ -1,4 +1,4 @@
 /jackrabbit/oak/branches/1.0:1665962
 /jackrabbit/oak/branches/1.4:1745750,1747354,1750078,1750512
-/jackrabbit/oak/trunk:1672350,1672468-1672469,1672537,1672603,1672611,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675089,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678202,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681921,1681955,1682042,1682218,1682235,1682437,1682488,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376
 ,1684442,1684561,1684570,1684601,1684618,1684669,1684820,1684868,1684894,1685023,1685075,1685370,1685541,1685552,1685589-1685590,1685834,1685839-1685840,1685964,1685977,1685989,1685999,1686003,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686772,1686780,1686790,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690657,1690669,1690672,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691201,1691210,1691217-1691218,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691394,1691401,1691498,1691509,1692133-1692134,1692156,1692250,1692272,1692274,1692363,1692382,1692393,1692478,1692955,1693002,1693030,1693050,1693209,1693
 401,1693421,1693525-1693526,1694007,1694049,1694393-1694394,1694651,1694653-1694654,1695032,1695050,1695122,1695223,1695280,1695299,1695420,1695457,1695482,1695492,1695507,1695521,1695540,1695571,1695829-1695830,1695905,1696190,1696194,1696242,1696285,1696375,1696522,1696578,1696759,1696916,1697363,1697373,1697383,1697410,1697579,1697582,1697589,1697616,1697672,1697896,1698096,1698144,1700191,1700231,1700397,1700403,1700506,1700571,1700709,1700718,1700720,1700727,1700749,1700769,1700775,1701065,1701613,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701907,1701948,1701955,1701959,1701965,1701986,1702014,1702022,1702045,1702051,1702241,1702272,1702371,1702387,1702405,1702423,1702426,1702428,1702860,1702866,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705005,1705027,1705043,1705055,1705250,1705268,1705273,1705323,1
 705677,1705701,1705871,1705992,1705998,1706009,1706037,1706059,1706212,1706218,1706270,1706764,1706772,1707049,1707189,1707191,1707223,1707331,1707435,1707509,1707753,1708049,1708105,1708307,1708315,1708401,1708546,1708592,1708738,1708766,1709012,1709852,1709978,1710013,1710031,1710049,1710205,1710242,1710559,1710575,1710590,1710614,1710637,1710789,1710800,1710811,1710816,1710972,1711209,1711248,1711282,1711296,1711405,1711498,1711654,1712018,1712042,1712319,1712490,1712531,1712730,1712785,1712963,1713008,1713439,1713461,1713580,1713586,1713599-1713600,1713626,1713698,1713803,1713809,1714034,1714061,1714084,1714170,1714174,1714213,1714229,1714238,1714519-1714520,1714543-1714544,1714730,1714739,1714779,1714827,1714956,1714961,1715010,1715092,1715191,1715346,1715716,1715767,1715771,1715888,1715898,1716100,1716178,1716426,1716576,1716588-1716589,1716596,1716616,1716703,1716712,1716815,1716823,1716830,1716840,1716845,1716883,1717177,1717203,1717277,1717393-1717394,1717410,1717462,171763
 2,1717768-1717769,1717784,1717789,1717988,1718528,1718533,1718546-1718548,1718553,1718626,1718646,1718772,1718801-1718802,1718895,1719111,1719288,1719869,1720306,1720335,1720350,1720354,1720500,1721160,1721172,1721337,1722141,1722832,1723227,1723239,1723241,1723251,1723254,1723333,1723347,1723350,1723565,1723584,1723713,1723731,1724026,1724057,1724186,1724210,1724401,1724628,1724631,1725216,1725477,1725515,1725555,1725895,1725899,1725935,1725941,1725960,1726232,1726237,1726570,1726579,1726585-1726586,1726621,1726795,1726797,1726809,1726812,1726981,1726993,1727026,1727149,1727254,1727331,1727350,1727358,1727429,1727476,1727483,1727508,1727515-1727518,1727813,1727816,1727831-1727832,1727841,1727893,1727895,1727912-1727913,1727923,1727991,1728037,1728041,1728070,1728114,1728281,1728294-1728297,1728300,1728443,1728525,1728642,1729200,1729505,1729578,1729599,1729957,1729962,1729979,1730050,1730216,1730527,1730581,1730629,1730801,1731627,1731647-1731648,1731789,1731797,1732131,1732268,173
 2278,1732330,1732647-1732648,1732864,1733615,1733929,1734230,1734254,1735052,1735081,1735109,1735141,1735267,1735405,1735484,1735549,1735564,1735588,1735919,1736176,1737309-1737310,1737334,1737349,1738138,1738252,1738833,1738950,1738957,1739712,1739760,1739867,1739894,1739959-1739960,1740114,1740116,1740250,1740333,1740360,1740626,1740774,1740837,1740971,1741016,1741032,1741339,1741343,1742117,1742520,1742888,1742916,1743097,1743172,1743343,1743674,1744265,1744292,1744589,1744670,1744672,1744959,1745038,1745127,1745197,1745336,1746117,1746408,1746696,1746981,1747341-1747342,1747380,1747387,1747406,1747492,1747654,1748505,1748553,1748722,1748870,1749350,1749464,1749475,1749645,1749662,1749815,1749872,1749875,1749899,1750052,1750076-1750077,1750287,1750457,1750462,1750465,1750495,1750626,1750809,1750886,1751376,1751410,1751445-1751446,1751478,1751755,1752198,1752202,1752273-1752274,1752438,1752508,1752659,1752672,1753262,1753331-1753332,1753355,1753444,1754117,1754239,1755366,1756520,
 1756580,1757119,1757166,1759433,1760340,1760373,1760387,1761444,1761571,1761762,1761787,1763347,1763378,1763735,1764705,1765817,1765983,1766071,1766496,1766554,1766644,1767025,1767265,1768446,1768637,1770694,1771739,1775474,1775622,1775757,1780543,1781068,1781075,1781386,1781846,1783066,1783089,1783104-1783105
+/jackrabbit/oak/trunk:1672350,1672468-1672469,1672537,1672603,1672611,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675089,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678202,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681921,1681955,1682042,1682218,1682235,1682437,1682488,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376
 ,1684442,1684561,1684570,1684601,1684618,1684669,1684820,1684868,1684894,1685023,1685075,1685370,1685541,1685552,1685589-1685590,1685834,1685839-1685840,1685964,1685977,1685989,1685999,1686003,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686772,1686780,1686790,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690657,1690669,1690672,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691201,1691210,1691217-1691218,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691394,1691401,1691498,1691509,1692133-1692134,1692156,1692250,1692272,1692274,1692363,1692382,1692393,1692478,1692955,1693002,1693030,1693050,1693209,1693
 401,1693421,1693525-1693526,1694007,1694049,1694393-1694394,1694651,1694653-1694654,1695032,1695050,1695122,1695223,1695280,1695299,1695420,1695457,1695482,1695492,1695507,1695521,1695540,1695571,1695829-1695830,1695905,1696190,1696194,1696242,1696285,1696375,1696522,1696578,1696759,1696916,1697363,1697373,1697383,1697410,1697579,1697582,1697589,1697616,1697672,1697896,1698096,1698144,1700191,1700231,1700397,1700403,1700506,1700571,1700709,1700718,1700720,1700727,1700749,1700769,1700775,1701065,1701613,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701907,1701948,1701955,1701959,1701965,1701986,1702014,1702022,1702045,1702051,1702241,1702272,1702371,1702387,1702405,1702423,1702426,1702428,1702860,1702866,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705005,1705027,1705043,1705055,1705250,1705268,1705273,1705323,1
 705677,1705701,1705871,1705992,1705998,1706009,1706037,1706059,1706212,1706218,1706270,1706764,1706772,1707049,1707189,1707191,1707223,1707331,1707435,1707509,1707753,1708049,1708105,1708307,1708315,1708401,1708546,1708592,1708738,1708766,1709012,1709852,1709978,1710013,1710031,1710049,1710205,1710242,1710559,1710575,1710590,1710614,1710637,1710789,1710800,1710811,1710816,1710972,1711209,1711248,1711282,1711296,1711405,1711498,1711654,1712018,1712042,1712319,1712490,1712531,1712730,1712785,1712963,1713008,1713439,1713461,1713580,1713586,1713599-1713600,1713626,1713698,1713803,1713809,1714034,1714061,1714084,1714170,1714174,1714213,1714229,1714238,1714519-1714520,1714543-1714544,1714730,1714739,1714779,1714827,1714956,1714961,1715010,1715092,1715191,1715346,1715716,1715767,1715771,1715888,1715898,1716100,1716178,1716426,1716576,1716588-1716589,1716596,1716616,1716703,1716712,1716815,1716823,1716830,1716840,1716845,1716883,1717177,1717203,1717277,1717393-1717394,1717410,1717462,171763
 2,1717768-1717769,1717784,1717789,1717988,1718528,1718533,1718546-1718548,1718553,1718626,1718646,1718772,1718801-1718802,1718895,1719111,1719288,1719869,1720306,1720335,1720350,1720354,1720500,1721160,1721172,1721337,1722141,1722832,1723227,1723239,1723241,1723251,1723254,1723333,1723347,1723350,1723565,1723584,1723713,1723731,1724026,1724057,1724186,1724210,1724401,1724628,1724631,1725216,1725477,1725515,1725555,1725895,1725899,1725935,1725941,1725960,1726232,1726237,1726570,1726579,1726585-1726586,1726621,1726795,1726797,1726809,1726812,1726981,1726993,1727026,1727149,1727254,1727331,1727350,1727358,1727429,1727476,1727483,1727508,1727515-1727518,1727813,1727816,1727831-1727832,1727841,1727893,1727895,1727912-1727913,1727923,1727991,1728037,1728041,1728070,1728114,1728281,1728294-1728297,1728300,1728443,1728525,1728642,1729200,1729505,1729578,1729599,1729957,1729962,1729979,1730050,1730216,1730527,1730581,1730629,1730801,1731627,1731647-1731648,1731789,1731797,1732131,1732268,173
 2278,1732330,1732647-1732648,1732864,1733615,1733929,1734230,1734254,1735052,1735081,1735109,1735141,1735267,1735405,1735484,1735549,1735564,1735588,1735919,1736176,1737309-1737310,1737334,1737349,1738138,1738252,1738833,1738950,1738957,1739712,1739760,1739867,1739894,1739959-1739960,1740114,1740116,1740250,1740333,1740360,1740626,1740774,1740837,1740971,1741016,1741032,1741339,1741343,1742117,1742520,1742888,1742916,1743097,1743172,1743343,1743674,1744265,1744292,1744589,1744670,1744672,1744959,1745038,1745127,1745197,1745336,1746117,1746408,1746696,1746981,1747341-1747342,1747380,1747387,1747406,1747492,1747654,1748505,1748553,1748722,1748870,1749350,1749464,1749475,1749645,1749662,1749815,1749872,1749875,1749899,1750052,1750076-1750077,1750287,1750457,1750462,1750465,1750495,1750626,1750809,1750886,1751376,1751410,1751445-1751446,1751478,1751755,1752198,1752202,1752273-1752274,1752438,1752508,1752659,1752672,1753262,1753331-1753332,1753355,1753444,1754117,1754239,1755366,1756520,
 1756580,1757119,1757166,1759433,1760340,1760373,1760387,1761444,1761571,1761762,1761787,1763347,1763378,1763735,1764705,1765817,1765983,1766071,1766496,1766554,1766644,1767025,1767265,1768446,1768637,1770694,1771739,1775474,1775622,1775757,1780543,1781068,1781075,1781386,1781846,1783066,1783089,1783104-1783105,1783619
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.2/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?rev=1783625&r1=1783624&r2=1783625&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Sun Feb 19 11:09:46 2017
@@ -21,9 +21,9 @@ package org.apache.jackrabbit.oak.jcr.ob
 import ch.qos.logback.classic.Level;
 import org.apache.jackrabbit.api.JackrabbitRepository;
 import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
-import org.apache.jackrabbit.oak.jcr.NodeStoreFixture ;
 import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
 import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.jcr.NodeStoreFixture;
 import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.After;
@@ -31,11 +31,14 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jcr.Node;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.Event;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.observation.EventListener;
 import javax.jcr.observation.ObservationManager;
@@ -44,26 +47,38 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static javax.jcr.observation.Event.NODE_ADDED;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
 public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
-    static final int OBS_QUEUE_LENGTH = 5;
-    static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
+    private static final int OBS_QUEUE_LENGTH = 5;
+    private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
 
     private static final String TEST_NODE = "test_node";
     private static final String TEST_NODE_TYPE = "oak:Unstructured";
     private static final String TEST_PATH = '/' + TEST_NODE;
 
-    private static final long CONDITION_TIMEOUT = 10*1000;
+    private static final long OBS_TIMEOUT_PER_ITEM = 1000;
+    private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * OBS_TIMEOUT_PER_ITEM;
 
     private Session observingSession;
     private ObservationManager observationManager;
 
+    private final BlockableListener listener = new BlockableListener();
+
+    private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class);
+
+    private final Semaphore blockObservation = new Semaphore(1);
+
+    private final AtomicInteger numAddedNodes = new AtomicInteger(0);
+    private final AtomicInteger numObservedNodes = new AtomicInteger(0);
+
     public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
         super(fixture);
+        LOG.info("fixture: {}", fixture);
     }
 
     @Override
@@ -78,7 +93,7 @@ public class ObservationQueueFullWarnTes
         session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
         session.save();
 
-        Map<String,Object> attrs = new HashMap<String,Object>();
+        Map<String, Object> attrs = new HashMap<String, Object>();
         attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
         observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
         observationManager = observingSession.getWorkspace().getObservationManager();
@@ -96,17 +111,13 @@ public class ObservationQueueFullWarnTes
                 .contains(OBS_QUEUE_FULL_WARN)
                 .create();
 
-        final LoggingListener listener = new LoggingListener();
         observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
         try {
-            Node n = getAdminSession().getNode(TEST_PATH);
-
             customLogs.starting();
-            addNodeToFillObsQueue(n, 0, listener);
-            assertTrue("Observation queue full warning must gets logged", customLogs.getLogs().size() > 0);
+            addNodeToFillObsQueue();
+            assertTrue("Observation queue full warning must get logged", customLogs.getLogs().size() > 0);
             customLogs.finished();
-        }
-        finally {
+        } finally {
             observationManager.removeEventListener(listener);
         }
     }
@@ -135,40 +146,35 @@ public class ObservationQueueFullWarnTes
         ChangeProcessor.clock = virtualClock;
         virtualClock.waitUntil(System.currentTimeMillis());
 
-        final LoggingListener listener = new LoggingListener();
         observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
         try {
-            Node n = getAdminSession().getNode(TEST_PATH);
-            int nodeNameCounter = 0;
-
             //Create first level WARN message
-            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
-            emptyObsQueueABit(listener);
+            addNodeToFillObsQueue();
+            emptyObsQueue();
 
             //Don't wait, fill up the queue again
             warnLogs.starting();
             debugLogs.starting();
-            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            addNodeToFillObsQueue();
             assertTrue("Observation queue full warning must not logged until some time has past since last log",
                     warnLogs.getLogs().size() == 0);
             assertTrue("Observation queue full warning should get logged on debug though in the mean time",
                     debugLogs.getLogs().size() > 0);
             warnLogs.finished();
             debugLogs.finished();
-            emptyObsQueueABit(listener);
+            emptyObsQueue();
 
             //Wait some time so reach WARN level again
             virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
 
             warnLogs.starting();
             debugLogs.starting();
-            addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            addNodeToFillObsQueue();
             assertTrue("Observation queue full warning must get logged after some time has past since last log",
                     warnLogs.getLogs().size() > 0);
             warnLogs.finished();
             debugLogs.finished();
-        }
-        finally {
+        } finally {
             observationManager.removeEventListener(listener);
             ChangeProcessor.clock = oldClockInstance;
             ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
@@ -177,66 +183,79 @@ public class ObservationQueueFullWarnTes
         }
     }
 
-    private void emptyObsQueueABit(final LoggingListener listener) throws InterruptedException {
-        //Let queue empty up a bit.
-        boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new Condition() {
-            @Override
-            public boolean evaluate() {
-                return listener.numAdded >= 2;
-            }
-        });
-        listener.numAdded = 0;
-        assertTrue("Listener didn't process events within time-out", notTimedOut);
-    }
-
-    private interface Condition {
-        boolean evaluate();
+    private void addANode(String prefix) throws RepositoryException {
+        Session session = getAdminSession();
+        Node parent = session.getNode(TEST_PATH);
+        String nodeName = prefix + numAddedNodes.get();
+        parent.addNode(nodeName);
+        session.save();
+        numAddedNodes.incrementAndGet();
     }
 
-    private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener)
+    private void addNodeToFillObsQueue()
             throws RepositoryException {
-        listener.blockObservation.acquireUninterruptibly();
+        blockObservation.acquireUninterruptibly();
         try {
-            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
-                parent.addNode("n" + nodeNameCounter);
-                parent.getSession().save();
+            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
+                addANode("n");
             }
-            return nodeNameCounter;
         } finally {
-            listener.blockObservation.release();
+            blockObservation.release();
         }
     }
 
-    private class LoggingListener implements EventListener {
+    private interface Condition {
+        boolean evaluate();
+    }
 
-        private volatile int numAdded = 0;
+    private boolean waitFor(long timeout, Condition c)
+            throws InterruptedException {
+        long end = System.currentTimeMillis() + timeout;
+        long remaining = end - System.currentTimeMillis();
+        while (remaining > 0) {
+            if (c.evaluate()) {
+                return true;
+            }
 
-        Semaphore blockObservation = new Semaphore(1);
+            //Add another node only when num_pending_to_be_observed nodes is
+            //less that observation queue. This is done to let all observation finish
+            //up in case last few event were dropped due to full observation queue
+            //(which is ok as the next event that comes in gets diff-ed with last
+            //processed revision)
+            if (numAddedNodes.get() < numObservedNodes.get() + OBS_QUEUE_LENGTH) {
+                try {
+                    addANode("addedWhileWaiting");
+                } catch (RepositoryException e) {
+                    LOG.warn("exception while adding during wait: {}", e);
+                }
+            }
+            Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated
+            remaining = end - System.currentTimeMillis();
+        }
+        return c.evaluate();
+    }
 
+    private void emptyObsQueue() throws InterruptedException {
+        boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() {
+            @Override
+            public boolean evaluate() {
+                return numObservedNodes.get()==numAddedNodes.get();
+            }
+        });
+        assertTrue("Listener didn't process events within time-out", notTimedOut);
+    }
+
+    private class BlockableListener implements EventListener {
         @Override
-        public synchronized void onEvent(EventIterator events) {
+        public void onEvent(EventIterator events) {
             blockObservation.acquireUninterruptibly();
             while (events.hasNext()) {
-                events.nextEvent();
-                numAdded++;
-            }
-            blockObservation.release();
-
-            notifyAll();
-        }
-
-        synchronized boolean waitFor(long timeout, Condition c)
-                throws InterruptedException {
-            long end = System.currentTimeMillis() + timeout;
-            long remaining = end - System.currentTimeMillis();
-            while (remaining > 0) {
-                if (c.evaluate()) {
-                    return true;
+                Event event = events.nextEvent();
+                if (event.getType() == Event.NODE_ADDED) {
+                    numObservedNodes.incrementAndGet();
                 }
-                wait(remaining);
-                remaining = end - System.currentTimeMillis();
             }
-            return false;
+            blockObservation.release();
         }
     }
 }