You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/06/14 14:48:51 UTC

svn commit: r1493054 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java

Author: mduerig
Date: Fri Jun 14 12:48:51 2013
New Revision: 1493054

URL: http://svn.apache.org/r1493054
Log:
OAK-144 Implement Observation
removeEventListener should not block when called from event listener

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1493054&r1=1493053&r2=1493054&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Fri Jun 14 12:48:51 2013
@@ -64,8 +64,9 @@ class ChangeProcessor implements Runnabl
 
     private final Exception initStacktrace;
 
-    private volatile boolean running = false;
+    private volatile Thread running = null;
     private volatile boolean stopping = false;
+    private Runnable deferredUnregister;
 
     private Registration registration;
     private Listener changeListener;
@@ -105,7 +106,6 @@ class ChangeProcessor implements Runnabl
                 WhiteboardUtils.scheduleWithFixedDelay(whiteboard, this, 1);
     }
 
-
     /**
      * Stop this change processor if running. After returning from this methods no further
      * events will be delivered.
@@ -113,29 +113,46 @@ class ChangeProcessor implements Runnabl
      */
     public void stop() {
         stopping = true; // do this outside synchronization
-        synchronized (this) {
-            try {
-                while (running) {
-                    wait();
+
+        if (running == Thread.currentThread()) {
+            // Defer stopping from event listener, defer unregistering until
+            // event listener is done
+            deferredUnregister = new Runnable() {
+                @Override
+                public void run() {
+                    unregister();
+                }
+            };
+        } else {
+            // Otherwise wait for the event listener to terminate and unregister immediately
+            synchronized (this) {
+                try {
+                    while (running != null) {
+                        wait();
+                    }
+                    unregister();
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
-                checkState(registration != null, "Change processor not started");
-                changeListener.dispose();
-                registration.unregister();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
             }
         }
     }
 
+    private void unregister() {
+        checkState(registration != null, "Change processor not started");
+        changeListener.dispose();
+        registration.unregister();
+    }
+
     @Override
     public void run() {
         // guarantee that only one thread is processing changes at a time
         synchronized (this) {
-            if (running) {
+            if (running != null) {
                 return;
             } else {
-                running = true;
+                running = Thread.currentThread();
             }
         }
 
@@ -152,8 +169,11 @@ class ChangeProcessor implements Runnabl
         } catch (Exception e) {
             log.error("Unable to generate or send events", e);
         } finally {
-            running = false;
+            running = null;
             synchronized (this) { notifyAll(); }
+            if (deferredUnregister != null) {
+                deferredUnregister.run();
+            }
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java?rev=1493054&r1=1493053&r2=1493054&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/RepositoryTest.java Fri Jun 14 12:48:51 2013
@@ -2092,6 +2092,55 @@ public class RepositoryTest extends Abst
     }
 
     @Test
+    public void observationDisposeFromListener() throws RepositoryException, InterruptedException, ExecutionException,
+            TimeoutException {
+
+        final Session observingSession = createAdminSession();
+        final AtomicReference<RepositoryException> repositoryException = new AtomicReference<RepositoryException>(null);
+        final AtomicReference<CountDownLatch> unregistered = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+        try {
+            final ObservationManager obsMgr = observingSession.getWorkspace().getObservationManager();
+            final EventListener listener = new EventListener() {
+                @Override
+                public void onEvent(EventIterator events) {
+                    try {
+                        // Unregistering listener from event handler should not block
+                        obsMgr.removeEventListener(this);
+                    }
+                    catch (RepositoryException e) {
+                        repositoryException.set(e);
+                    }
+                    finally {
+                        unregistered.get().countDown();
+                    }
+                }
+            };
+
+            obsMgr.addEventListener(listener, Event.NODE_ADDED | Event.NODE_REMOVED | Event.NODE_MOVED |
+                    Event.PROPERTY_ADDED | Event.PROPERTY_REMOVED | Event.PROPERTY_CHANGED | Event.PERSIST,
+                    "/", true, null, null, false);
+
+            // Ensure the listener is there
+            assertTrue(obsMgr.getRegisteredEventListeners().hasNext());
+
+            // Generate events
+            Node n = getNode(TEST_PATH);
+            n.addNode("c");
+            n.getSession().save();
+
+            // Make sure we see the events and the listener is gone
+            assertTrue(unregistered.get().await(2, TimeUnit.SECONDS));
+            if (repositoryException.get() != null) {
+                throw repositoryException.get();
+            }
+            assertFalse(obsMgr.getRegisteredEventListeners().hasNext());
+        }
+        finally {
+            observingSession.logout();
+        }
+    }
+
+    @Test
     public void observationOnRootNode() throws Exception {
         final AtomicReference<CountDownLatch> hasEvents = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
         Session observingSession = createAdminSession();