You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/10/29 18:22:26 UTC

svn commit: r1536822 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java

Author: mduerig
Date: Tue Oct 29 17:22:25 2013
New Revision: 1536822

URL: http://svn.apache.org/r1536822
Log:
OAK-144 Implement observation
Restore ChangeProcessor.stop contract to not deliver any more events after its return

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1536822&r1=1536821&r2=1536822&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Tue Oct 29 17:22:25 2013
@@ -110,6 +110,13 @@ public class ChangeProcessor {
         checkState(!stopping, "Change processor already stopped");
 
         stopping = true;
+        if (Thread.currentThread() != thread) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                log.warn("Interruption while waiting for the observation thread to terminate", e);
+            }
+        }
     }
 
     //------------------------------------------------------------< private >---
@@ -131,8 +138,8 @@ public class ChangeProcessor {
         @Override
         public void run() {
             try {
-                ChangeSet changes = changeListener.getChanges(100);
                 while (!stopping) {
+                    ChangeSet changes = changeListener.getChanges(100);
                     EventFilter filter = filterRef.get();
                     // FIXME don't rely on toString for session id
                     if (changes != null &&
@@ -146,7 +153,6 @@ public class ChangeProcessor {
                             listener.onEvent(new EventIteratorAdapter(events));
                         }
                     }
-                    changes = changeListener.getChanges(100);
                 }
             } catch (Exception e) {
                 log.debug("Error while dispatching observation events", e);