You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/06/14 14:48:51 UTC
svn commit: r1493054 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java
Author: mduerig
Date: Fri Jun 14 12:48:51 2013
New Revision: 1493054
URL: http://svn.apache.org/r1493054
Log:
OAK-144 Implement Observation
removeEventListener should not block when called from event listener
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1493054&r1=1493053&r2=1493054&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Fri Jun 14 12:48:51 2013
@@ -64,8 +64,9 @@ class ChangeProcessor implements Runnabl
private final Exception initStacktrace;
- private volatile boolean running = false;
+ private volatile Thread running = null;
private volatile boolean stopping = false;
+ private Runnable deferredUnregister;
private Registration registration;
private Listener changeListener;
@@ -105,7 +106,6 @@ class ChangeProcessor implements Runnabl
WhiteboardUtils.scheduleWithFixedDelay(whiteboard, this, 1);
}
-
/**
* Stop this change processor if running. After returning from this methods no further
* events will be delivered.
@@ -113,29 +113,46 @@ class ChangeProcessor implements Runnabl
*/
public void stop() {
stopping = true; // do this outside synchronization
- synchronized (this) {
- try {
- while (running) {
- wait();
+
+ if (running == Thread.currentThread()) {
+ // Defer stopping from event listener, defer unregistering until
+ // event listener is done
+ deferredUnregister = new Runnable() {
+ @Override
+ public void run() {
+ unregister();
+ }
+ };
+ } else {
+ // Otherwise wait for the event listener to terminate and unregister immediately
+ synchronized (this) {
+ try {
+ while (running != null) {
+ wait();
+ }
+ unregister();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- checkState(registration != null, "Change processor not started");
- changeListener.dispose();
- registration.unregister();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
}
}
}
+ private void unregister() {
+ checkState(registration != null, "Change processor not started");
+ changeListener.dispose();
+ registration.unregister();
+ }
+
@Override
public void run() {
// guarantee that only one thread is processing changes at a time
synchronized (this) {
- if (running) {
+ if (running != null) {
return;
} else {
- running = true;
+ running = Thread.currentThread();
}
}
@@ -152,8 +169,11 @@ class ChangeProcessor implements Runnabl
} catch (Exception e) {
log.error("Unable to generate or send events", e);
} finally {
- running = false;
+ running = null;
synchronized (this) { notifyAll(); }
+ if (deferredUnregister != null) {
+ deferredUnregister.run();
+ }
}
}
Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java?rev=1493054&r1=1493053&r2=1493054&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java Fri Jun 14 12:48:51 2013
@@ -2092,6 +2092,55 @@ public class RepositoryTest extends Abst
}
@Test
+ public void observationDisposeFromListener() throws RepositoryException, InterruptedException, ExecutionException,
+ TimeoutException {
+
+ final Session observingSession = createAdminSession();
+ final AtomicReference<RepositoryException> repositoryException = new AtomicReference<RepositoryException>(null);
+ final AtomicReference<CountDownLatch> unregistered = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ try {
+ final ObservationManager obsMgr = observingSession.getWorkspace().getObservationManager();
+ final EventListener listener = new EventListener() {
+ @Override
+ public void onEvent(EventIterator events) {
+ try {
+ // Unregistering listener from event handler should not block
+ obsMgr.removeEventListener(this);
+ }
+ catch (RepositoryException e) {
+ repositoryException.set(e);
+ }
+ finally {
+ unregistered.get().countDown();
+ }
+ }
+ };
+
+ obsMgr.addEventListener(listener, Event.NODE_ADDED | Event.NODE_REMOVED | Event.NODE_MOVED |
+ Event.PROPERTY_ADDED | Event.PROPERTY_REMOVED | Event.PROPERTY_CHANGED | Event.PERSIST,
+ "/", true, null, null, false);
+
+ // Ensure the listener is there
+ assertTrue(obsMgr.getRegisteredEventListeners().hasNext());
+
+ // Generate events
+ Node n = getNode(TEST_PATH);
+ n.addNode("c");
+ n.getSession().save();
+
+ // Make sure we see the events and the listener is gone
+ assertTrue(unregistered.get().await(2, TimeUnit.SECONDS));
+ if (repositoryException.get() != null) {
+ throw repositoryException.get();
+ }
+ assertFalse(obsMgr.getRegisteredEventListeners().hasNext());
+ }
+ finally {
+ observingSession.logout();
+ }
+ }
+
+ @Test
public void observationOnRootNode() throws Exception {
final AtomicReference<CountDownLatch> hasEvents = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
Session observingSession = createAdminSession();