You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by st...@apache.org on 2017/07/19 07:22:21 UTC
svn commit: r1802360 - in /jackrabbit/oak/trunk:
oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/
oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/
oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/commit/
Author: stefanegli
Date: Wed Jul 19 07:22:20 2017
New Revision: 1802360
URL: http://svn.apache.org/viewvc?rev=1802360&view=rev
Log:
OAK-5740 : deliver overflow change even without new commit - removes the BackgroundObserver.full flag and instead replaces the last queue entry with a compaction entry
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java?rev=1802360&r1=1802359&r2=1802360&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java Wed Jul 19 07:22:20 2017
@@ -338,7 +338,7 @@ public class BackgroundObserverTest {
// this one will be queued as #2
NodeState thirdIncluded = generator.next();
- expected.add(new Pair(secondIncluded, thirdIncluded));
+// expected.add(new Pair(secondIncluded, thirdIncluded));
fo.contentChanged(thirdIncluded, CommitInfo.EMPTY);
// this one will cause the queue to 'overflow' (full==true)
@@ -358,6 +358,10 @@ public class BackgroundObserverTest {
next = generator.next();
// excluded==false BUT queue full, hence not adding to expected
fo.contentChanged(next, CommitInfo.EMPTY);
+
+ // with OAK-5740 the overflow entry now looks as follows:
+ expected.add(new Pair(secondIncluded, next));
+
// let recorder continue
recorder.unpause();
@@ -376,10 +380,10 @@ public class BackgroundObserverTest {
// only happens with non-filtered items, so adding yet another one now
filter.excludeNext(false);
NodeState last = generator.next();
- // while above the "seventhAfterQueueFull" DOES get filtered, the next contentChange
- // triggers the release of the 'queue full overflow element' (with commitInfo==null)
- // and that we must add as expected()
- expected.add(new Pair(thirdIncluded, seventhAfterQueueFull)); // commitInfo == null
+ // the 'seventhAfterQueueFull' DOES get filtered - and as per behavior
+ // pre-OAK-5740 it used to get flushed with the next contentChanged,
+ // however, with OAK-5740 this is no longer the case as we now
+ // use the last queue entry as the overflow entry
expected.add(new Pair(seventhAfterQueueFull, last));
fo.contentChanged(last, CommitInfo.EMPTY);
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java?rev=1802360&r1=1802359&r2=1802360&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java Wed Jul 19 07:22:20 2017
@@ -246,10 +246,11 @@ public class PrefilteringBackgroundObser
new TestPattern(INCLUDED, 5, false, 0, 0, 0, 6),
// here: 1 init and 5 changes are in the queue, the queue fits 7, so queue is almost full
new TestPattern(EXCLUDED, 500, false, 0, 0, 6, 6),
- // still 6 in the queue, of 7
+ // still 6 in the queue, of 7
+ // due to OAK-5740 the last entry is now an include
new TestPattern(INCLUDED, 5, false, 0, 0, 6, 7),
- // now we added 2 (one NOOP and one of those 5), so the queue got full (==7)
- new TestPattern(EXCLUDED, 0 /* only flush*/, true, 5, 0, 7, 0)
+ // so with OAK-5740 we now will get 6 includes, not 5
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 6, 0, 7, 0)
);
}
Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?rev=1802360&r1=1802359&r2=1802360&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java Wed Jul 19 07:22:20 2017
@@ -1,261 +1,351 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.jcr.observation;
-
-import ch.qos.logback.classic.Level;
-import org.apache.jackrabbit.api.JackrabbitRepository;
-import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
-import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
-import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
-import org.apache.jackrabbit.oak.jcr.Jcr;
-import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
-import org.apache.jackrabbit.oak.stats.Clock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jcr.Node;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.SimpleCredentials;
-import javax.jcr.observation.Event;
-import javax.jcr.observation.EventIterator;
-import javax.jcr.observation.EventListener;
-import javax.jcr.observation.ObservationManager;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static javax.jcr.observation.Event.NODE_ADDED;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
- private static final int OBS_QUEUE_LENGTH = 5;
- private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
-
- private static final String TEST_NODE = "test_node";
- private static final String TEST_NODE_TYPE = "oak:Unstructured";
- private static final String TEST_PATH = '/' + TEST_NODE;
-
- private static final long OBS_TIMEOUT_PER_ITEM = 1000;
- private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * OBS_TIMEOUT_PER_ITEM;
-
- private Session observingSession;
- private ObservationManager observationManager;
-
- private final BlockableListener listener = new BlockableListener();
-
- private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class);
-
- private final Semaphore blockObservation = new Semaphore(1);
-
- private final AtomicInteger numAddedNodes = new AtomicInteger(0);
- private final AtomicInteger numObservedNodes = new AtomicInteger(0);
-
- public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
- super(fixture);
- LOG.info("fixture: {}", fixture);
- }
-
- @Override
- protected Jcr initJcr(Jcr jcr) {
- return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH);
- }
-
- @Before
- public void setup() throws RepositoryException {
- Session session = getAdminSession();
-
- session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
- session.save();
-
- Map<String, Object> attrs = new HashMap<>();
- attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
- observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
- observationManager = observingSession.getWorkspace().getObservationManager();
- }
-
- @After
- public void tearDown() {
- observingSession.logout();
- }
-
- @Test
- public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
- LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
- .filter(Level.WARN)
- .contains(OBS_QUEUE_FULL_WARN)
- .create();
-
- observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
- try {
- customLogs.starting();
- addNodeToFillObsQueue();
- assertTrue("Observation queue full warning must get logged", customLogs.getLogs().size() > 0);
- customLogs.finished();
- } finally {
- observationManager.removeEventListener(listener);
- }
- }
-
- @Test
- public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
- LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
- .filter(Level.WARN)
- .contains(OBS_QUEUE_FULL_WARN)
- .create();
- LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
- .filter(Level.DEBUG)
- .contains(OBS_QUEUE_FULL_WARN)
- .create();
- LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName())
- .enable(Level.DEBUG)
- .create();
- logLevelSetting.starting();
-
- long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL;
- //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next.
- ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10);
-
- Clock oldClockInstance = ChangeProcessor.clock;
- Clock virtualClock = new Clock.Virtual();
- ChangeProcessor.clock = virtualClock;
- virtualClock.waitUntil(System.currentTimeMillis());
-
- observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
- try {
- //Create first level WARN message
- addNodeToFillObsQueue();
- emptyObsQueue();
-
- //Don't wait, fill up the queue again
- warnLogs.starting();
- debugLogs.starting();
- addNodeToFillObsQueue();
- assertTrue("Observation queue full warning must not logged until some time has past since last log",
- warnLogs.getLogs().size() == 0);
- assertTrue("Observation queue full warning should get logged on debug though in the mean time",
- debugLogs.getLogs().size() > 0);
- warnLogs.finished();
- debugLogs.finished();
- emptyObsQueue();
-
- //Wait some time so reach WARN level again
- virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
-
- warnLogs.starting();
- debugLogs.starting();
- addNodeToFillObsQueue();
- assertTrue("Observation queue full warning must get logged after some time has past since last log",
- warnLogs.getLogs().size() > 0);
- warnLogs.finished();
- debugLogs.finished();
- } finally {
- observationManager.removeEventListener(listener);
- ChangeProcessor.clock = oldClockInstance;
- ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
-
- logLevelSetting.finished();
- }
- }
-
- private void addANode(String prefix) throws RepositoryException {
- Session session = getAdminSession();
- Node parent = session.getNode(TEST_PATH);
- String nodeName = prefix + numAddedNodes.get();
- parent.addNode(nodeName);
- session.save();
- numAddedNodes.incrementAndGet();
- }
-
- private void addNodeToFillObsQueue()
- throws RepositoryException {
- blockObservation.acquireUninterruptibly();
- try {
- for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
- addANode("n");
- }
- } finally {
- blockObservation.release();
- }
- }
-
- private interface Condition {
- boolean evaluate();
- }
-
- private boolean waitFor(long timeout, Condition c)
- throws InterruptedException {
- long end = System.currentTimeMillis() + timeout;
- long remaining = end - System.currentTimeMillis();
- while (remaining > 0) {
- if (c.evaluate()) {
- return true;
- }
-
- //Add another node only when num_pending_to_be_observed nodes is
- //less that observation queue. This is done to let all observation finish
- //up in case last few event were dropped due to full observation queue
- //(which is ok as the next event that comes in gets diff-ed with last
- //processed revision)
- if (numAddedNodes.get() < numObservedNodes.get() + OBS_QUEUE_LENGTH) {
- try {
- addANode("addedWhileWaiting");
- } catch (RepositoryException e) {
- LOG.warn("exception while adding during wait: {}", e);
- }
- }
- Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated
- remaining = end - System.currentTimeMillis();
- }
- return c.evaluate();
- }
-
- private void emptyObsQueue() throws InterruptedException {
- boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() {
- @Override
- public boolean evaluate() {
- return numObservedNodes.get()==numAddedNodes.get();
- }
- });
- assertTrue("Listener didn't process events within time-out", notTimedOut);
- }
-
- private class BlockableListener implements EventListener {
- @Override
- public void onEvent(EventIterator events) {
- blockObservation.acquireUninterruptibly();
- while (events.hasNext()) {
- Event event = events.nextEvent();
- if (event.getType() == Event.NODE_ADDED) {
- numObservedNodes.incrementAndGet();
- }
- }
- blockObservation.release();
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation;
+
+import ch.qos.logback.classic.Level;
+import org.apache.jackrabbit.api.JackrabbitRepository;
+import org.apache.jackrabbit.api.observation.JackrabbitEvent;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
+import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.observation.ObservationManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static javax.jcr.observation.Event.NODE_ADDED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
+ private static final int OBS_QUEUE_LENGTH = 5;
+ private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
+
+ private static final String TEST_NODE = "test_node";
+ private static final String TEST_NODE_TYPE = "oak:Unstructured";
+ private static final String TEST_PATH = '/' + TEST_NODE;
+
+ private static final long OBS_TIMEOUT_PER_ITEM = 1000;
+ private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * OBS_TIMEOUT_PER_ITEM;
+
+ private Session observingSession;
+ private ObservationManager observationManager;
+
+ private final BlockableListener listener = new BlockableListener();
+
+ private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class);
+
+ private final Semaphore blockObservation = new Semaphore(1);
+
+ private final AtomicInteger numAddedNodes = new AtomicInteger(0);
+ private final AtomicInteger numObservedNodes = new AtomicInteger(0);
+
+ public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
+ super(fixture);
+ LOG.info("fixture: {}", fixture);
+ }
+
+ @Override
+ protected Jcr initJcr(Jcr jcr) {
+ return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH);
+ }
+
+ @Before
+ public void setup() throws RepositoryException {
+ Session session = getAdminSession();
+
+ session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
+ session.save();
+
+ Map<String, Object> attrs = new HashMap<>();
+ attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
+ observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
+ observationManager = observingSession.getWorkspace().getObservationManager();
+ }
+
+ @After
+ public void tearDown() {
+ observingSession.logout();
+ }
+
+ @Test
+ public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+ LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.WARN)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+
+ observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+ try {
+ customLogs.starting();
+ addNodeToFillObsQueue();
+ assertTrue("Observation queue full warning must get logged", customLogs.getLogs().size() > 0);
+ customLogs.finished();
+ } finally {
+ observationManager.removeEventListener(listener);
+ }
+ }
+
+ @Test
+ public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
+ LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.WARN)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+ LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .filter(Level.DEBUG)
+ .contains(OBS_QUEUE_FULL_WARN)
+ .create();
+ LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName())
+ .enable(Level.DEBUG)
+ .create();
+ logLevelSetting.starting();
+
+ long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL;
+ //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next.
+ ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10);
+
+ Clock oldClockInstance = ChangeProcessor.clock;
+ Clock virtualClock = new Clock.Virtual();
+ ChangeProcessor.clock = virtualClock;
+ virtualClock.waitUntil(System.currentTimeMillis());
+
+ observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false);
+ try {
+ //Create first level WARN message
+ addNodeToFillObsQueue();
+ emptyObsQueue();
+
+ //Don't wait, fill up the queue again
+ warnLogs.starting();
+ debugLogs.starting();
+ addNodeToFillObsQueue();
+ assertTrue("Observation queue full warning must not logged until some time has past since last log",
+ warnLogs.getLogs().size() == 0);
+ assertTrue("Observation queue full warning should get logged on debug though in the mean time",
+ debugLogs.getLogs().size() > 0);
+ warnLogs.finished();
+ debugLogs.finished();
+ emptyObsQueue();
+
+ //Wait some time so reach WARN level again
+ virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
+
+ warnLogs.starting();
+ debugLogs.starting();
+ addNodeToFillObsQueue();
+ assertTrue("Observation queue full warning must get logged after some time has past since last log",
+ warnLogs.getLogs().size() > 0);
+ warnLogs.finished();
+ debugLogs.finished();
+ } finally {
+ observationManager.removeEventListener(listener);
+ ChangeProcessor.clock = oldClockInstance;
+ ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
+
+ logLevelSetting.finished();
+ }
+ }
+
+ private void addANode(String prefix) throws RepositoryException {
+ Session session = getAdminSession();
+ Node parent = session.getNode(TEST_PATH);
+ String nodeName = prefix + numAddedNodes.get();
+ parent.addNode(nodeName);
+ session.save();
+ numAddedNodes.incrementAndGet();
+ }
+
+ private void addNodeToFillObsQueue()
+ throws RepositoryException {
+ blockObservation.acquireUninterruptibly();
+ try {
+ for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
+ addANode("n");
+ }
+ } finally {
+ blockObservation.release();
+ }
+ }
+
+ private interface Condition {
+ boolean evaluate();
+ }
+
+ private boolean waitFor(long timeout, Condition c)
+ throws InterruptedException {
+ long end = System.currentTimeMillis() + timeout;
+ long remaining = end - System.currentTimeMillis();
+ while (remaining > 0) {
+ if (c.evaluate()) {
+ return true;
+ }
+
+ Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated
+ remaining = end - System.currentTimeMillis();
+ }
+ return c.evaluate();
+ }
+
+ private void emptyObsQueue() throws InterruptedException {
+ boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() {
+ @Override
+ public boolean evaluate() {
+ return numObservedNodes.get()==numAddedNodes.get();
+ }
+ });
+ assertTrue("Listener didn't process events within time-out", notTimedOut);
+ }
+
+ private class BlockableListener implements EventListener {
+ @Override
+ public void onEvent(EventIterator events) {
+ blockObservation.acquireUninterruptibly();
+ while (events.hasNext()) {
+ Event event = events.nextEvent();
+ if (event.getType() == Event.NODE_ADDED) {
+ numObservedNodes.incrementAndGet();
+ }
+ }
+ blockObservation.release();
+ }
+ }
+
+ @Test
+ public void testQueueFullThenFlushing() throws Exception {
+ final Semaphore semaphore = new Semaphore(0);
+ final AtomicLong counter = new AtomicLong(0);
+ final AtomicLong localCounter = new AtomicLong(0);
+ EventListener listeners = new EventListener() {
+
+ @Override
+ public void onEvent(EventIterator events) {
+ try {
+ semaphore.acquire();
+ long numEvents = events.getSize();
+ counter.addAndGet(numEvents);
+ System.out.println("GOT: "+numEvents + " - COUNTER: "+counter.get());
+ while(events.hasNext()) {
+ Event e = events.nextEvent();
+ System.out.println(" - " + e);
+ if (PathUtils.getName(e.getPath()).startsWith("local")) {
+ if (e instanceof JackrabbitEvent && !((JackrabbitEvent)e).isExternal()) {
+ localCounter.incrementAndGet();
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new Error(e);
+ } catch (RepositoryException e) {
+ throw new Error(e);
+ }
+ }
+ };
+ Session session = getAdminSession();
+ Node root = session.getNode("/");
+ root.addNode("testNode");
+ session.save();
+
+ observationManager.addEventListener(listeners, Event.PROPERTY_ADDED, "/", true, null, null, false);
+
+ int propCounter = 0;
+ // send out 6 events (or in general: queue length + 1):
+ // event #0 will get delivered but stalls at the listener (queue empty though)
+ // event #1-#5 will fill the queue - all must remain "local"
+ for(int i=0; i<OBS_QUEUE_LENGTH + 1; i++, propCounter++) {
+ root = session.getNode("/");
+ root.getNode("testNode").setProperty("local" + propCounter, propCounter);
+ System.out.println("storing: /testNode/local" + propCounter);
+ session.save();
+ }
+
+ // release the listener to consume 6 events
+ semaphore.release(OBS_QUEUE_LENGTH+1);
+
+ boolean notTimedOut = waitFor(2000, new Condition() {
+ @Override
+ public boolean evaluate() {
+ return (OBS_QUEUE_LENGTH+1)==counter.get();
+ }
+ });
+ assertTrue("Listener didn't process " + (OBS_QUEUE_LENGTH+1) + " events within time-out", notTimedOut);
+ assertEquals("Just filled queue must not convert local->external", OBS_QUEUE_LENGTH+1, localCounter.get());
+
+ counter.set(0);
+
+ // send out 7 events (or in general: queue length + 2):
+ // event #0 will get delivered but stalls at the listener (queue empty though)
+ // event #1-#5 will fill the queue
+ // event #6 will not fit in the queue anymore (queue full)
+ for(int i=0; i<OBS_QUEUE_LENGTH + 2; i++, propCounter++) {
+ root = session.getNode("/");
+ root.getNode("testNode").setProperty("p" + propCounter, propCounter);
+ System.out.println("storing: /testNode/p" + propCounter);
+ session.save();
+ }
+
+ // release the listener
+ semaphore.release(100); // ensure acquire will no longer block during this test -> pass 100
+
+ notTimedOut = waitFor(2000, new Condition() {
+ @Override
+ public boolean evaluate() {
+ return (OBS_QUEUE_LENGTH+2)==counter.get();
+ }
+ });
+ assertTrue("Listener didn't process " + (OBS_QUEUE_LENGTH+2) + " events within time-out", notTimedOut);
+
+ root = session.getNode("/");
+ root.getNode("testNode").setProperty("p" + propCounter, propCounter);
+ System.out.println("storing: /testNode/p" + propCounter);
+ session.save();
+
+ notTimedOut = waitFor(1000, new Condition() {
+ @Override
+ public boolean evaluate() {
+ return (OBS_QUEUE_LENGTH+3)==counter.get();
+ }
+ });
+ assertTrue("Listener didn't process " + (OBS_QUEUE_LENGTH+3) + " events within time-out", notTimedOut);
+ }
+}
Modified: jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1802360&r1=1802359&r2=1802360&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (original)
+++ jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java Wed Jul 19 07:22:20 2017
@@ -110,12 +110,6 @@ public class BackgroundObserver implemen
private ContentChange last;
/**
- * Flag to indicate that some content changes were dropped because
- * the queue was full.
- */
- private boolean full;
-
- /**
* Current background task
*/
private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed();
@@ -283,28 +277,30 @@ public class BackgroundObserver implemen
//TODO - Support for merging ChangeSet for external changes
queue.remove(last);
- full = false;
}
- ContentChange change;
- if (full) {
- // If the queue is full, some commits have already been skipped
- // so we need to drop the possible local commit information as
- // only external changes can be merged together to larger chunks.
- change = new ContentChange(root, CommitInfo.EMPTY_EXTERNAL);
- } else {
- change = new ContentChange(root, info);
- }
+ ContentChange change = new ContentChange(root, info);
- // Try to add this change to the queue without blocking, and
- // mark the queue as full if there wasn't enough space
- full = !queue.offer(change);
-
- if (!full) {
- // Keep track of the last change added, so we can do the
- // compacting of external changes shown above.
- last = change;
+ // Try to add this change to the queue without blocking
+ boolean full = !queue.offer(change);
+
+ if (full && last != null) { // last is only null at the beginning
+ // queue is full.
+
+ // when the change can't be added to the queue because it's full
+ // remove the last entry and add an explicit overflow entry instead.
+ queue.remove(last);
+
+ // by removing the last entry we have to drop the possible
+ // local commit information of the current change,
+ // as we're doing collapsing here and the commit information
+ // no longer represents an individual commit
+ change = new ContentChange(root, CommitInfo.EMPTY_EXTERNAL);
+ queue.offer(change);
}
+ // Keep track of the last change added, so we can do the
+ // compacting of external changes shown above.
+ last = change;
// Set the completion handler on the currently running task. Multiple calls
// to onComplete are not a problem here since we always pass the same value.