You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/10/24 21:06:34 UTC

svn commit: r1535500 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/observation/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/

Author: jukka
Date: Thu Oct 24 19:06:33 2013
New Revision: 1535500

URL: http://svn.apache.org/r1535500
Log:
OAK-1113: Immediate delivery of events from local commits

Use the LinkedBlockingQueue from a separate listener thread to avoid the fixed scheduling of events

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java?rev=1535500&r1=1535499&r2=1535500&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java Thu Oct 24 19:06:33 2013
@@ -21,16 +21,18 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Objects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Queues.newLinkedBlockingQueue;
 
-import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import com.google.common.base.Objects;
-import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -63,7 +65,8 @@ public class ChangeDispatcher {
     private final Set<Listener> listeners = Sets.newHashSet();
     private final NodeStore store;
 
-    private NodeState previousRoot;
+    @Nonnull
+    private volatile NodeState root;
 
     /**
      * Create a new instance for recording changes to {@code store}.
@@ -71,7 +74,7 @@ public class ChangeDispatcher {
      */
     public ChangeDispatcher(@Nonnull NodeStore store) {
         this.store = store;
-        previousRoot = checkNotNull(store.getRoot());
+        this.root = checkNotNull(store.getRoot());
     }
 
     /**
@@ -82,7 +85,7 @@ public class ChangeDispatcher {
      */
     @Nonnull
     public Listener newListener() {
-        Listener listener = new Listener();
+        Listener listener = new Listener(root);
         register(listener);
         return listener;
     }
@@ -130,12 +133,8 @@ public class ChangeDispatcher {
             @Nonnull NodeState root, @Nonnull CommitInfo info) {
         checkState(inLocalCommit());
         checkNotNull(root);
-        if (info != null) {
-            add(new ChangeSet(previousRoot, root, info));
-        } else {
-            add(new ChangeSet(previousRoot, root));
-        }
-        previousRoot = root;
+        add(root, info);
+        this.root = root;
     }
 
     /**
@@ -168,9 +167,9 @@ public class ChangeDispatcher {
     }
 
     private synchronized void externalChange(NodeState root) {
-        if (!root.equals(previousRoot)) {
-            add(new ChangeSet(previousRoot, root));
-            previousRoot = root;
+        if (!root.equals(this.root)) {
+            add(root, null);
+            this.root = root;
         }
     }
 
@@ -186,9 +185,11 @@ public class ChangeDispatcher {
         }
     }
 
-    private void add(ChangeSet changeSet) {
-        for (Listener l : getListeners()) {
-            l.add(changeSet);
+    private void add(NodeState root, CommitInfo info) {
+        synchronized (listeners) {
+            for (Listener l : getListeners()) {
+                l.add(root, info);
+            }
         }
     }
 
@@ -201,10 +202,21 @@ public class ChangeDispatcher {
     //------------------------------------------------------------< Listener >---
 
     /**
-     * Listener for receiving changes reported into a change dispatcher by any of its hooks.
+     * Listener for receiving changes reported into a change dispatcher by
+     * any of its hooks.
      */
     public class Listener {
-        private final Queue<ChangeSet> changeSets = Queues.newLinkedBlockingQueue();
+
+        private final LinkedBlockingQueue<ChangeSet> changeSets =
+                newLinkedBlockingQueue();
+
+        private NodeState previousRoot;
+
+        private boolean blocked = false;
+
+        Listener(NodeState root) {
+            this.previousRoot = checkNotNull(root);
+        }
 
         /**
          * Free up any resources associated by this hook.
@@ -215,20 +227,31 @@ public class ChangeDispatcher {
 
         /**
          * Poll for changes reported to this listener.
-         * @return  {@code ChangeSet} of the changes or {@code null} if no changes occurred since
-         *          the last call to this method.
+         *
+         * @param timeout maximum number of milliseconds to wait for changes
+         * @return  {@code ChangeSet} of the changes, or {@code null} if
+         *          no changes occurred since the last call to this method
+         *          and before the timeout
+         * @throws InterruptedException if polling was interrupted
          */
         @CheckForNull
-        public ChangeSet getChanges() {
+        public ChangeSet getChanges(long timeout) throws InterruptedException {
             if (changeSets.isEmpty()) {
                 externalChange();
             }
-
-            return changeSets.isEmpty() ? null : changeSets.remove();
+            return changeSets.poll(timeout, TimeUnit.MILLISECONDS);
         }
 
-        private void add(ChangeSet changeSet) {
-            changeSets.add(changeSet);
+        private synchronized void add(NodeState root, CommitInfo info) {
+            if (blocked) {
+                info = null;
+            }
+            if (changeSets.offer(new ChangeSet(previousRoot, root, info))) {
+                previousRoot = root;
+                blocked = false;
+            } else {
+                blocked = true;
+            }
         }
     }
 
@@ -246,22 +269,13 @@ public class ChangeDispatcher {
         private final CommitInfo commitInfo;
 
         /**
-         * Creates an external change set
-         */
-        ChangeSet(@Nonnull NodeState before, @Nonnull NodeState after) {
-            this.before = checkNotNull(before);
-            this.after = checkNotNull(after);
-            this.commitInfo = null;
-        }
-
-        /**
-         * Creates a local change set
+         * Creates a change set
          */
         ChangeSet(@Nonnull NodeState before, @Nonnull NodeState after,
-                @Nonnull CommitInfo commitInfo) {
+                @Nullable CommitInfo commitInfo) {
             this.before = checkNotNull(before);
             this.after = checkNotNull(after);
-            this.commitInfo = checkNotNull(commitInfo);
+            this.commitInfo = commitInfo;
         }
 
         public boolean isExternal() {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1535500&r1=1535499&r2=1535500&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Thu Oct 24 19:06:33 2013
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
  * {@link #run()} methods to be regularly executed and stopped in order to not
  * execute its run method anymore.
  */
-public class ChangeProcessor implements Runnable {
+public class ChangeProcessor {
 
     private static final Logger log =
             LoggerFactory.getLogger(ChangeProcessor.class);
@@ -84,13 +84,12 @@ public class ChangeProcessor implements 
     private final ListenerTracker tracker;
     private final EventListener listener;
 
-    private volatile Thread running = null;
-    private volatile boolean stopping = false;
-    private volatile Runnable deferredUnregister;
+    /**
+     * Background thread used to wait for and deliver events.
+     */
+    private ListenerThread thread = null;
 
-    private Registration runnable;
-    private Registration mbean;
-    private Listener changeListener;
+    private volatile boolean stopping = false;
 
     public ChangeProcessor(
             ContentSession contentSession, NamePathMapper namePathMapper,
@@ -118,14 +117,12 @@ public class ChangeProcessor implements 
      * @throws IllegalStateException if started already
      */
     public synchronized void start(Whiteboard whiteboard) {
-        checkState(runnable == null, "Change processor started already");
+        checkState(thread == null, "Change processor started already");
 
-        stopping = false;
-        changeListener = ((Observable) contentSession).newListener();
-        runnable = WhiteboardUtils.scheduleWithFixedDelay(whiteboard, this, 1);
-        mbean = WhiteboardUtils.registerMBean(
-                whiteboard, EventListenerMBean.class, tracker.getListenerMBean(),
-                "EventListener", tracker.toString());
+        thread = new ListenerThread(whiteboard);
+        thread.setDaemon(true);
+        thread.setPriority(Thread.MIN_PRIORITY);
+        thread.start();
     }
 
     /**
@@ -133,87 +130,63 @@ public class ChangeProcessor implements 
      * events will be delivered.
      * @throws IllegalStateException if not yet started or stopped already
      */
-    public void stop() {
-        stopping = true; // do this outside synchronization
+    public synchronized void stop() {
+        checkState(thread != null, "Change processor not started");
+        checkState(!stopping, "Change processor already stopped");
 
-        if (running == Thread.currentThread()) {
-            // Defer stopping from event listener, defer unregistering until
-            // event listener is done
-            deferredUnregister = new Runnable() {
-                @Override
-                public void run() {
-                    unregister();
-                }
-            };
-        } else {
-            // Otherwise wait for the event listener to terminate and unregister immediately
-            synchronized (this) {
-                try {
-                    while (running != null) {
-                        wait();
-                    }
-                    unregister();
-                }
-                catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
+        stopping = true;
     }
 
-    private void unregister() {
-        checkState(runnable != null, "Change processor not started");
-        mbean.unregister();
-        runnable.unregister();
-        changeListener.dispose();
+    private static ImmutableTree getTree(NodeState beforeState, String path) {
+        return new ImmutableRoot(beforeState).getTree(path);
     }
 
-    @Override
-    public void run() {
-        // guarantee that only one thread is processing changes at a time
-        synchronized (this) {
-            if (running != null) {
-                return;
-            } else {
-                running = Thread.currentThread();
-            }
+    //------------------------------------------------------------< private >---
+
+    private class ListenerThread extends Thread {
+
+        private final Listener changeListener =
+                ((Observable) contentSession).newListener();
+
+        private final Registration mbean;
+
+        ListenerThread(Whiteboard whiteboard) {
+            mbean = WhiteboardUtils.registerMBean(
+                    whiteboard, EventListenerMBean.class,
+                    tracker.getListenerMBean(), "EventListener",
+                    tracker.toString());
         }
 
-        try {
-            ChangeSet changes = changeListener.getChanges();
-            while (!stopping && changes != null) {
-                EventFilter filter = filterRef.get();
-                // FIXME don't rely on toString for session id
-                if (!(filter.excludeLocal() && changes.isLocal(contentSession.toString()))) {
-                    String path = namePathMapper.getOakPath(filter.getPath());
-                    ImmutableTree beforeTree = getTree(changes.getBeforeState(), path);
-                    ImmutableTree afterTree = getTree(changes.getAfterState(), path);
-                    EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(
-                            changes.getCommitInfo(), beforeTree, afterTree);
-                    SecureNodeStateDiff.compare(VisibleDiff.wrap(diff), beforeTree, afterTree);
-                    if (!stopping) {
-                        diff.sendEvents();
+        @Override
+        public void run() {
+            try {
+                ChangeSet changes = changeListener.getChanges(100);
+                while (!stopping) {
+                    EventFilter filter = filterRef.get();
+                    // FIXME don't rely on toString for session id
+                    if (!(filter.excludeLocal() && changes.isLocal(contentSession.toString()))) {
+                        String path = namePathMapper.getOakPath(filter.getPath());
+                        ImmutableTree beforeTree = getTree(changes.getBeforeState(), path);
+                        ImmutableTree afterTree = getTree(changes.getAfterState(), path);
+                        EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(
+                                changes.getCommitInfo(), beforeTree, afterTree);
+                        SecureNodeStateDiff.compare(VisibleDiff.wrap(diff), beforeTree, afterTree);
+                        if (!stopping) {
+                            diff.sendEvents();
+                        }
                     }
+                    changes = changeListener.getChanges(100);
                 }
-                changes = changeListener.getChanges();
-            }
-        } catch (Exception e) {
-            log.debug("Error while dispatching observation events", e);
-        } finally {
-            running = null;
-            synchronized (this) { notifyAll(); }
-            if (deferredUnregister != null) {
-                deferredUnregister.run();
+            } catch (Exception e) {
+                log.debug("Error while dispatching observation events", e);
+            } finally {
+                mbean.unregister();
+                changeListener.dispose();
             }
         }
-    }
 
-    private static ImmutableTree getTree(NodeState beforeState, String path) {
-        return new ImmutableRoot(beforeState).getTree(path);
     }
 
-    //------------------------------------------------------------< private >---
-
     private class EventGeneratingNodeStateDiff extends RecursingNodeStateDiff {
         public static final int EVENT_LIMIT = 8192;
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1535500&r1=1535499&r2=1535500&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Thu Oct 24 19:06:33 2013
@@ -49,7 +49,8 @@ public class CommitQueueTest {
                 try {
                     Revision before = new Revision(0, 0, store.getClusterId());
                     while (running.get()) {
-                        ChangeDispatcher.ChangeSet changes = listener.getChanges();
+                        ChangeDispatcher.ChangeSet changes =
+                                listener.getChanges(1000);
                         if (changes != null) {
                             MongoNodeState after = (MongoNodeState) changes.getAfterState();
                             Revision r = after.getRevision();
@@ -61,13 +62,6 @@ public class CommitQueueTest {
                                 break;
                             }
                             before = r;
-                        } else {
-                            // avoid busy wait
-                            try {
-                                Thread.sleep(10);
-                            } catch (InterruptedException e) {
-                                // ignore
-                            }
                         }
                     }
                 } catch (Exception e) {



Re: svn commit: r1535500 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/observation/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/

Posted by Michael Dürig <md...@apache.org>.
On 24.10.13 9:06 , jukka@apache.org wrote:
> Author: jukka
> Date: Thu Oct 24 19:06:33 2013
> New Revision: 1535500
>
> URL: http://svn.apache.org/r1535500
> Log:
> OAK-1113: Immediate delivery of events from local commits
>
> Use the LinkedBlockingQueue from a separate listener thread to avoid the fixed scheduling of events
> @@ -186,9 +185,11 @@ public class ChangeDispatcher {
>           }
>       }
>
> -    private void add(ChangeSet changeSet) {
> -        for (Listener l : getListeners()) {
> -            l.add(changeSet);
> +    private void add(NodeState root, CommitInfo info) {
> +        synchronized (listeners) {
> +            for (Listener l : getListeners()) {
> +                l.add(root, info);
> +            }
>           }
>       }

I don't think we need the sync. here. add() is not called concurrently 
and getListeners() returns a copy of the registered listeners.

> -        private void add(ChangeSet changeSet) {
> -            changeSets.add(changeSet);
> +        private synchronized void add(NodeState root, CommitInfo info) {
> +            if (blocked) {
> +                info = null;
> +            }
> +            if (changeSets.offer(new ChangeSet(previousRoot, root, info))) {
> +                previousRoot = root;
> +                blocked = false;
> +            } else {
> +                blocked = true;
> +            }
>           }
>       }

Also here add is not called concurrently, so no need to sync.

Michael