You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/11/06 16:27:05 UTC

svn commit: r1539363 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/core/ oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/ oak-core/src/main/java/or...

Author: mduerig
Date: Wed Nov  6 15:27:04 2013
New Revision: 1539363

URL: http://svn.apache.org/r1539363
Log:
OAK-1143  [scala] Repository init throws "illegal cyclic reference involving class ChangeDispatcher"
ChangeDispatcher distributes to observers now and ChangeProcessor is a regular observer

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
      - copied, changed from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java
      - copied, changed from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java
Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java Wed Nov  6 15:27:04 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.core;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -29,9 +30,9 @@ import javax.security.auth.login.LoginEx
 import org.apache.jackrabbit.oak.api.AuthInfo;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.api.Root;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.security.authentication.LoginContext;
@@ -118,8 +119,8 @@ class ContentSessionImpl implements Cont
     }
 
     @Override
-    public Listener newListener() {
-        return ((Observable) store).newListener();
+    public Closeable addObserver(Observer observer) {
+        return ((Observable) store).addObserver(observer);
     }
 
     //-----------------------------------------------------------< Closable >---

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java Wed Nov  6 15:27:04 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.kernel
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.concurrent.ExecutionException;
@@ -39,9 +40,8 @@ import org.apache.jackrabbit.mk.api.Micr
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.cache.CacheLIRS;
 import org.apache.jackrabbit.oak.cache.CacheStats;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
@@ -148,8 +148,8 @@ public class KernelNodeStore implements 
     //------------------------------------------------------------< Observable >---
 
     @Override
-    public Listener newListener() {
-        return changeDispatcher.newListener();
+    public Closeable addObserver(Observer observer) {
+        return changeDispatcher.addObserver(observer);
     }
 
     //----------------------------------------------------------< NodeStore >---

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java Wed Nov  6 15:27:04 2013
@@ -26,7 +26,7 @@ import org.apache.jackrabbit.mk.api.Micr
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.AbstractNodeStoreBranch;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java Wed Nov  6 15:27:04 2013
@@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLat
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java Wed Nov  6 15:27:04 2013
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.ref.WeakReference;
@@ -58,8 +59,8 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.plugins.mongomk.util.LoggingDocumentStoreWrapper;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
@@ -863,8 +864,8 @@ public final class MongoNodeStore
     //------------------------< Observable >------------------------------------
 
     @Override
-    public ChangeDispatcher.Listener newListener() {
-        return dispatcher.newListener();
+    public Closeable addObserver(Observer observer) {
+        return dispatcher.addObserver(observer);
     }
 
     //-------------------------< NodeStore >------------------------------------

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java Wed Nov  6 15:27:04 2013
@@ -25,7 +25,7 @@ import org.apache.jackrabbit.mk.api.Micr
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.kernel.BlobSerializer;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.AbstractNodeStoreBranch;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Wed Nov  6 15:27:04 2013
@@ -21,10 +21,15 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.jcr.observation.EventListener;
 
+import com.google.common.base.Objects;
 import org.apache.jackrabbit.api.jmx.EventListenerMBean;
 import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter;
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
@@ -32,8 +37,9 @@ import org.apache.jackrabbit.oak.api.Con
 import org.apache.jackrabbit.oak.core.ImmutableRoot;
 import org.apache.jackrabbit.oak.core.ImmutableTree;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.ChangeSet;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
@@ -43,28 +49,23 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s
- * based on a {@link EventFilter} and delivers them to an {@link javax.jcr.observation.EventListener}.
+ * based on a {@link EventFilter} and delivers them to an {@link EventListener}.
  * <p>
- * After instantiation a {@code ChangeProcessor} must be started in order for its
- * {@link ListenerThread listener thread's} run methods to be regularly
- * executed and stopped in order to not execute its run method anymore.
+ * After instantiation a {@code ChangeProcessor} must be started in order to start
+ * delivering observation events and stopped to stop doing so.
  */
-public class ChangeProcessor {
+public class ChangeProcessor implements Observer {
     private static final Logger log = LoggerFactory.getLogger(ChangeProcessor.class);
 
     private final ContentSession contentSession;
     private final NamePathMapper namePathMapper;
-    private final AtomicReference<EventFilter> filterRef;
-
     private final ListenerTracker tracker;
     private final EventListener listener;
+    private final AtomicReference<EventFilter> filterRef;
 
-    /**
-     * Background thread used to wait for and deliver events.
-     */
-    private ListenerThread thread = null;
-
-    private volatile boolean stopping = false;
+    private Closeable observer;
+    private Registration mbean;
+    private NodeState previousRoot;
 
     public ChangeProcessor(
             ContentSession contentSession, NamePathMapper namePathMapper,
@@ -73,7 +74,7 @@ public class ChangeProcessor {
         this.contentSession = contentSession;
         this.namePathMapper = namePathMapper;
         this.tracker = tracker;
-        this.listener = tracker.getTrackedListener();
+        listener = tracker.getTrackedListener();
         filterRef = new AtomicReference<EventFilter>(filter);
     }
 
@@ -92,12 +93,11 @@ public class ChangeProcessor {
      * @throws IllegalStateException if started already
      */
     public synchronized void start(Whiteboard whiteboard) {
-        checkState(thread == null, "Change processor started already");
+        checkState(observer == null, "Change processor started already");
+        observer = ((Observable) contentSession).addObserver(this);
+        mbean = WhiteboardUtils.registerMBean(whiteboard, EventListenerMBean.class,
+                tracker.getListenerMBean(), "EventListener", tracker.toString());
 
-        thread = new ListenerThread(whiteboard);
-        thread.setDaemon(true);
-        thread.setPriority(Thread.MIN_PRIORITY);
-        thread.start();
     }
 
     /**
@@ -106,71 +106,49 @@ public class ChangeProcessor {
      * @throws IllegalStateException if not yet started or stopped already
      */
     public synchronized void stop() {
-        checkState(thread != null, "Change processor not started");
-        checkState(!stopping, "Change processor already stopped");
-
-        stopping = true;
-        if (Thread.currentThread() != thread) {
-            try {
-                thread.join();
-            } catch (InterruptedException e) {
-                log.warn("Interruption while waiting for the observation thread to terminate", e);
-                Thread.currentThread().interrupt();
-            } finally {
-                thread.dispose();
-            }
+        checkState(observer != null, "Change processor not started");
+        try {
+            mbean.unregister();
+            observer.close();
+        } catch (IOException e) {
+            log.error("Error while stopping change listener", e);
         }
     }
 
-    //------------------------------------------------------------< private >---
-
-    private class ListenerThread extends Thread {
-
-        private final Listener changeListener =
-                ((Observable) contentSession).newListener();
-
-        private final Registration mbean;
-
-        ListenerThread(Whiteboard whiteboard) {
-            mbean = WhiteboardUtils.registerMBean(
-                    whiteboard, EventListenerMBean.class,
-                    tracker.getListenerMBean(), "EventListener",
-                    tracker.toString());
-        }
-
-        @Override
-        public void run() {
+    @Override
+    public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+        if (previousRoot != null) {
             try {
-                while (!stopping) {
-                    ChangeSet changes = changeListener.getChanges(100);
-                    EventFilter filter = filterRef.get();
-                    // FIXME don't rely on toString for session id
-                    if (changes != null &&
-                            filter.includeSessionLocal(changes.isLocal(contentSession.toString())) &&
-                            filter.includeClusterExternal(changes.getCommitInfo() == null)) {
-                        String path = namePathMapper.getOakPath(filter.getPath());
-                        ImmutableTree beforeTree = getTree(changes.getBeforeState(), path);
-                        ImmutableTree afterTree = getTree(changes.getAfterState(), path);
-                        EventGenerator events = new EventGenerator(changes.getCommitInfo(),
-                                beforeTree, afterTree, filter, namePathMapper);
-                        if (events.hasNext()) {
-                            listener.onEvent(new EventIteratorAdapter(events));
-                        }
+                EventFilter filter = filterRef.get();
+                if (filter.includeSessionLocal(isLocal(info))
+                        && filter.includeClusterExternal(isExternal(info))) {
+                    String path = namePathMapper.getOakPath(filter.getPath());
+                    ImmutableTree beforeTree = getTree(previousRoot, path);
+                    ImmutableTree afterTree = getTree(root, path);
+                    EventGenerator events = new EventGenerator(
+                            info, beforeTree, afterTree, filter, namePathMapper);
+                    if (events.hasNext()) {
+                        listener.onEvent(new EventIteratorAdapter(events));
                     }
                 }
             } catch (Exception e) {
                 log.warn("Error while dispatching observation events", e);
             }
         }
+        previousRoot = root;
+    }
 
-        private ImmutableTree getTree(NodeState nodeState, String path) {
-            return new ImmutableRoot(nodeState).getTree(path);
-        }
+    private boolean isLocal(CommitInfo info) {
+        // FIXME don't rely on toString for session id
+        return info != null && Objects.equal(info.getSessionId(), contentSession.toString());
+    }
 
-        void dispose() {
-            mbean.unregister();
-            changeListener.dispose();
-        }
+    private static boolean isExternal(CommitInfo info) {
+        return info == null;
+    }
+
+    private static ImmutableTree getTree(NodeState nodeState, String path) {
+        return new ImmutableRoot(nodeState).getTree(path);
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Wed Nov  6 15:27:04 2013
@@ -21,6 +21,7 @@ import static com.google.common.base.Pre
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -30,9 +31,8 @@ import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
@@ -105,8 +105,8 @@ public class SegmentNodeStore implements
     }
 
     @Override
-    public Listener newListener() {
-        return changeDispatcher.newListener();
+    public Closeable addObserver(Observer observer) {
+        return changeDispatcher.addObserver(observer);
     }
 
     @Override @Nonnull

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Wed Nov  6 15:27:04 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -27,7 +28,6 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
 import com.mongodb.Mongo;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -36,12 +36,12 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Service;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.plugins.segment.mongo.MongoStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
@@ -153,9 +153,9 @@ public class SegmentNodeStoreService imp
     //------------------------------------------------------------< Observable >---
 
     @Override
-    public Listener newListener() {
+    public Closeable addObserver(Observer observer) {
         Preconditions.checkState(delegate instanceof Observable);
-        return ((Observable) getDelegate()).newListener();
+        return ((Observable) getDelegate()).addObserver(observer);
     }
 
 

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java (from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java&r1=1539346&r2=1539363&rev=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java Wed Nov  6 15:27:04 2013
@@ -16,13 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.jackrabbit.oak.plugins.observation;
+package org.apache.jackrabbit.oak.spi.commit;
 
 import static com.google.common.base.Objects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Queues.newLinkedBlockingQueue;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -32,10 +34,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.base.Objects;
 import com.google.common.collect.Sets;
-
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 
@@ -58,8 +57,8 @@ import org.apache.jackrabbit.oak.spi.sta
       }
  * </pre>
  * <p>
- * The {@link #newListener()} method registers a listener for receiving changes reported
- * into a change dispatcher.
+ * The {@link #addObserver(Observer)} method registers an {@link Observer} for receiving
+ * notifications about all changes reported to this instance.
  */
 public class ChangeDispatcher implements Observable {
     private final Set<Listener> listeners = Sets.newHashSet();
@@ -78,15 +77,18 @@ public class ChangeDispatcher implements
     }
 
     /**
-     * Create a new {@link Listener} for receiving changes reported into
-     * this change dispatcher. Listeners need to be {@link Listener#dispose() disposed}
-     * when no longer needed.
-     * @return  a new {@code Listener} instance.
+     * Register a new {@link Observer} for receiving notifications about changes reported to
+     * this change dispatcher. Changes are reported asynchronously. Clients need to
+     * call {@link java.io.Closeable#close()} close} on the returned {@code Closeable} instance
+     * to stop receiving notifications.
+     *
+     * @return  a {@link Closeable} instance
      */
     @Override
     @Nonnull
-    public Listener newListener() {
-        Listener listener = new Listener(root);
+    public Closeable addObserver(Observer observer) {
+        Listener listener = new Listener(observer, root);
+        listener.start();
         register(listener);
         return listener;
     }
@@ -187,10 +189,8 @@ public class ChangeDispatcher implements
     }
 
     private void add(NodeState root, CommitInfo info) {
-        synchronized (listeners) {
-            for (Listener l : getListeners()) {
-                l.add(root, info);
-            }
+        for (Listener l : getListeners()) {
+            l.contentChanged(root, info);
         }
     }
 
@@ -203,123 +203,92 @@ public class ChangeDispatcher implements
     //------------------------------------------------------------< Listener >---
 
     /**
-     * Listener for receiving changes reported into a change dispatcher by
-     * any of its hooks.
+     * Listener thread receiving changes reported into {@code ChangeDispatcher} and
+     * asynchronously distributing these to an associated {@link Observer}.
      */
-    public class Listener {
-
-        private final LinkedBlockingQueue<ChangeSet> changeSets =
-                newLinkedBlockingQueue();
-
-        private NodeState previousRoot;
+    private class Listener extends Thread implements Closeable, Observer {
+        private final LinkedBlockingQueue<Commit> commits = newLinkedBlockingQueue();
+        private final Observer observer;
 
         private boolean blocked = false;
+        private volatile boolean stopping;
 
-        Listener(NodeState root) {
-            this.previousRoot = checkNotNull(root);
+        Listener(Observer observer, NodeState root) {
+            this.observer = checkNotNull(observer);
+            commits.add(new Commit(root, null));
+            setDaemon(true);
+            setPriority(Thread.MIN_PRIORITY);
         }
 
-        /**
-         * Free up any resources associated by this hook.
-         */
-        public void dispose() {
-            unregister(this);
+        @Override
+        public void contentChanged(NodeState root, CommitInfo info) {
+            Commit commit = new Commit(root, blocked ? null : info);
+            blocked = !commits.offer(commit);
         }
 
-        /**
-         * Poll for changes reported to this listener.
-         *
-         * @param timeout maximum number of milliseconds to wait for changes
-         * @return  {@code ChangeSet} of the changes, or {@code null} if
-         *          no changes occurred since the last call to this method
-         *          and before the timeout
-         * @throws InterruptedException if polling was interrupted
-         */
-        @CheckForNull
-        public ChangeSet getChanges(long timeout) throws InterruptedException {
-            if (changeSets.isEmpty()) {
-                externalChange();
+        @Override
+        public void run() {
+            try {
+                while (!stopping) {
+                    if (commits.isEmpty()) {
+                        externalChange();
+                    }
+                    Commit commit = commits.poll(100, TimeUnit.MILLISECONDS);
+                    if (commit != null) {
+                        observer.contentChanged(commit.getRoot(), commit.getCommitInfo());
+                    }
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
             }
-            return changeSets.poll(timeout, TimeUnit.MILLISECONDS);
         }
 
-        private synchronized void add(NodeState root, CommitInfo info) {
-            if (blocked) {
-                info = null;
-            }
-            if (changeSets.offer(new ChangeSet(previousRoot, root, info))) {
-                previousRoot = root;
-                blocked = false;
-            } else {
-                blocked = true;
+        @Override
+        public void close() throws IOException {
+            checkState(!stopping, "Change processor already stopped");
+
+            unregister(this);
+            stopping = true;
+            if (Thread.currentThread() != this) {
+                try {
+                    join();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException
+                            ("Interruption while waiting for the listener thread to terminate", e);
+                }
             }
         }
     }
 
-    //------------------------------------------------------------< ChangeSet >---
+    //------------------------------------------------------------< Commit >---
 
-    /**
-     * Instances of this class represent changes to a node store. In addition they
-     * record meta data associated with such changes like whether a change occurred
-     * on the local cluster node, the user causing the changes and the date the changes
-     * where persisted.
-     */
-    public static class ChangeSet {
-        private final NodeState before;
-        private final NodeState after;
+    private static class Commit {
+        private final NodeState root;
         private final CommitInfo commitInfo;
 
-        /**
-         * Creates a change set
-         */
-        ChangeSet(@Nonnull NodeState before, @Nonnull NodeState after,
-                @Nullable CommitInfo commitInfo) {
-            this.before = checkNotNull(before);
-            this.after = checkNotNull(after);
+        Commit(@Nonnull NodeState root, @Nullable CommitInfo commitInfo) {
+            this.root = checkNotNull(root);
             this.commitInfo = commitInfo;
         }
 
-        public boolean isExternal() {
-            return commitInfo == null;
-        }
-
-        public boolean isLocal(String sessionId) {
-            return commitInfo != null
-                    && Objects.equal(commitInfo.getSessionId(), sessionId);
+        @Nonnull
+        NodeState getRoot() {
+            return root;
         }
 
         @CheckForNull
-        public CommitInfo getCommitInfo() {
+        CommitInfo getCommitInfo() {
             return commitInfo;
         }
 
-        /**
-         * State before the change
-         * @return  before state
-         */
-        @Nonnull
-        public NodeState getBeforeState() {
-            return before;
-        }
-
-        /**
-         * State after the change
-         * @return  after state
-         */
-        @Nonnull
-        public NodeState getAfterState() {
-            return after;
-        }
-
         @Override
         public String toString() {
             return toStringHelper(this)
-                .add("base", before)
-                .add("head", after)
+                .add("root", root)
                 .add("commit info", commitInfo)
                 .toString();
         }
-
     }
 
 }

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java (from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java&r1=1539346&r2=1539363&rev=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java Wed Nov  6 15:27:04 2013
@@ -17,23 +17,24 @@
  * under the License.
  */
 
-package org.apache.jackrabbit.oak.plugins.observation;
+package org.apache.jackrabbit.oak.spi.commit;
 
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
+import java.io.Closeable;
 
 /**
- * An {@code Observable} supports attaching {@link Listener} instances for
+ * An {@code Observable} supports attaching {@link Observer} instances for
  * listening to content changes.
  *
- * @see ChangeDispatcher
+ * @see Observable
  */
 public interface Observable {
 
     /**
-     * Register a new {@code Listener}. Clients need to call
-     * {@link ChangeDispatcher.Listener#dispose()} to free
-     * up any resources associated with the listener when done.
-     * @return a fresh {@code Listener} instance.
+     * Register a new {@code Observer}. Clients need to call {@link Closeable#close()} 
+     * to stop getting notifications on the registered observer and to free up any resources
+     * associated with the registration.
+     * 
+     * @return a {@code Closeable} instance.
      */
-    Listener newListener();
+    Closeable addObserver(Observer observer);
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java Wed Nov  6 15:27:04 2013
@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.commons.PathUtils;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Wed Nov  6 15:27:04 2013
@@ -16,13 +16,20 @@
  */
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.Test;
 
 /**
@@ -38,38 +45,26 @@ public class CommitQueueTest {
     public void concurrentCommits() throws Exception {
         final MongoNodeStore store = new MongoMK.Builder().getNodeStore();
         ChangeDispatcher dispatcher = new ChangeDispatcher(store);
-        final ChangeDispatcher.Listener listener = dispatcher.newListener();
-        final AtomicBoolean running = new AtomicBoolean(true);
+        AtomicBoolean running = new AtomicBoolean(true);
         final CommitQueue queue = new CommitQueue(store, dispatcher);
-        final List<Exception> exceptions = Collections.synchronizedList(
-                new ArrayList<Exception>());
-        Thread reader = new Thread(new Runnable() {
+        final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
+
+        Closeable observer = dispatcher.addObserver(new Observer() {
+            private Revision before = new Revision(0, 0, store.getClusterId());
+
             @Override
-            public void run() {
-                try {
-                    Revision before = new Revision(0, 0, store.getClusterId());
-                    while (running.get()) {
-                        ChangeDispatcher.ChangeSet changes =
-                                listener.getChanges(1000);
-                        if (changes != null) {
-                            MongoNodeState after = (MongoNodeState) changes.getAfterState();
-                            Revision r = after.getRevision();
-                            // System.out.println("seen: " + r);
-                            if (r.compareRevisionTime(before) < 1) {
-                                exceptions.add(new Exception(
-                                        "Inconsistent revision sequence. Before: " +
-                                                before + ", after: " + r));
-                                break;
-                            }
-                            before = r;
-                        }
-                    }
-                } catch (Exception e) {
-                    exceptions.add(e);
+            public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+                MongoNodeState after = (MongoNodeState) root;
+                Revision r = after.getRevision();
+//                System.out.println("seen: " + r);
+                if (r.compareRevisionTime(before) < 1) {
+                    exceptions.add(new Exception(
+                            "Inconsistent revision sequence. Before: " +
+                                    before + ", after: " + r));
                 }
+                before = r;
             }
         });
-        reader.start();
 
         // perform commits with multiple threads
         List<Thread> writers = new ArrayList<Thread>();
@@ -108,7 +103,7 @@ public class CommitQueueTest {
             t.join();
         }
         running.set(false);
-        reader.join();
+        observer.close();
         for (Exception e : exceptions) {
             throw e;
         }

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java Wed Nov  6 15:27:04 2013
@@ -43,7 +43,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.observation.ChangeProcessor;
 import org.apache.jackrabbit.oak.plugins.observation.EventFilter;
 import org.apache.jackrabbit.oak.plugins.observation.ExcludeExternal;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java Wed Nov  6 15:27:04 2013
@@ -16,11 +16,16 @@
  */
 package org.apache.jackrabbit.oak.jcr.session;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.newTreeSet;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.jcr.PathNotFoundException;
@@ -52,8 +57,8 @@ import org.apache.jackrabbit.oak.namepat
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
 import org.apache.jackrabbit.oak.namepath.NamePathMapperImpl;
 import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
 import org.apache.jackrabbit.oak.plugins.value.ValueFactoryImpl;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -66,10 +71,6 @@ import org.apache.jackrabbit.oak.spi.xml
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Sets.newHashSet;
-import static com.google.common.collect.Sets.newTreeSet;
-
 /**
  * Instances of this class are passed to all JCR implementation classes
  * (e.g. {@code SessionImpl}, {@code NodeImpl}, etc.) and provide access to