You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/10/24 21:06:34 UTC
svn commit: r1535500 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/observation/
test/java/org/apache/jackrabbit/oak/plugins/mongomk/
Author: jukka
Date: Thu Oct 24 19:06:33 2013
New Revision: 1535500
URL: http://svn.apache.org/r1535500
Log:
OAK-1113: Immediate delivery of events from local commits
Use the LinkedBlockingQueue from a separate listener thread to avoid the fixed scheduling of events
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java?rev=1535500&r1=1535499&r2=1535500&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java Thu Oct 24 19:06:33 2013
@@ -21,16 +21,18 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Objects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Queues.newLinkedBlockingQueue;
-import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import com.google.common.base.Objects;
-import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -63,7 +65,8 @@ public class ChangeDispatcher {
private final Set<Listener> listeners = Sets.newHashSet();
private final NodeStore store;
- private NodeState previousRoot;
+ @Nonnull
+ private volatile NodeState root;
/**
* Create a new instance for recording changes to {@code store}.
@@ -71,7 +74,7 @@ public class ChangeDispatcher {
*/
public ChangeDispatcher(@Nonnull NodeStore store) {
this.store = store;
- previousRoot = checkNotNull(store.getRoot());
+ this.root = checkNotNull(store.getRoot());
}
/**
@@ -82,7 +85,7 @@ public class ChangeDispatcher {
*/
@Nonnull
public Listener newListener() {
- Listener listener = new Listener();
+ Listener listener = new Listener(root);
register(listener);
return listener;
}
@@ -130,12 +133,8 @@ public class ChangeDispatcher {
@Nonnull NodeState root, @Nonnull CommitInfo info) {
checkState(inLocalCommit());
checkNotNull(root);
- if (info != null) {
- add(new ChangeSet(previousRoot, root, info));
- } else {
- add(new ChangeSet(previousRoot, root));
- }
- previousRoot = root;
+ add(root, info);
+ this.root = root;
}
/**
@@ -168,9 +167,9 @@ public class ChangeDispatcher {
}
private synchronized void externalChange(NodeState root) {
- if (!root.equals(previousRoot)) {
- add(new ChangeSet(previousRoot, root));
- previousRoot = root;
+ if (!root.equals(this.root)) {
+ add(root, null);
+ this.root = root;
}
}
@@ -186,9 +185,11 @@ public class ChangeDispatcher {
}
}
- private void add(ChangeSet changeSet) {
- for (Listener l : getListeners()) {
- l.add(changeSet);
+ private void add(NodeState root, CommitInfo info) {
+ synchronized (listeners) {
+ for (Listener l : getListeners()) {
+ l.add(root, info);
+ }
}
}
@@ -201,10 +202,21 @@ public class ChangeDispatcher {
//------------------------------------------------------------< Listener >---
/**
- * Listener for receiving changes reported into a change dispatcher by any of its hooks.
+ * Listener for receiving changes reported into a change dispatcher by
+ * any of its hooks.
*/
public class Listener {
- private final Queue<ChangeSet> changeSets = Queues.newLinkedBlockingQueue();
+
+ private final LinkedBlockingQueue<ChangeSet> changeSets =
+ newLinkedBlockingQueue();
+
+ private NodeState previousRoot;
+
+ private boolean blocked = false;
+
+ Listener(NodeState root) {
+ this.previousRoot = checkNotNull(root);
+ }
/**
* Free up any resources associated by this hook.
@@ -215,20 +227,31 @@ public class ChangeDispatcher {
/**
* Poll for changes reported to this listener.
- * @return {@code ChangeSet} of the changes or {@code null} if no changes occurred since
- * the last call to this method.
+ *
+ * @param timeout maximum number of milliseconds to wait for changes
+ * @return {@code ChangeSet} of the changes, or {@code null} if
+ * no changes occurred since the last call to this method
+ * and before the timeout
+ * @throws InterruptedException if polling was interrupted
*/
@CheckForNull
- public ChangeSet getChanges() {
+ public ChangeSet getChanges(long timeout) throws InterruptedException {
if (changeSets.isEmpty()) {
externalChange();
}
-
- return changeSets.isEmpty() ? null : changeSets.remove();
+ return changeSets.poll(timeout, TimeUnit.MILLISECONDS);
}
- private void add(ChangeSet changeSet) {
- changeSets.add(changeSet);
+ private synchronized void add(NodeState root, CommitInfo info) {
+ if (blocked) {
+ info = null;
+ }
+ if (changeSets.offer(new ChangeSet(previousRoot, root, info))) {
+ previousRoot = root;
+ blocked = false;
+ } else {
+ blocked = true;
+ }
}
}
@@ -246,22 +269,13 @@ public class ChangeDispatcher {
private final CommitInfo commitInfo;
/**
- * Creates an external change set
- */
- ChangeSet(@Nonnull NodeState before, @Nonnull NodeState after) {
- this.before = checkNotNull(before);
- this.after = checkNotNull(after);
- this.commitInfo = null;
- }
-
- /**
- * Creates a local change set
+ * Creates a change set
*/
ChangeSet(@Nonnull NodeState before, @Nonnull NodeState after,
- @Nonnull CommitInfo commitInfo) {
+ @Nullable CommitInfo commitInfo) {
this.before = checkNotNull(before);
this.after = checkNotNull(after);
- this.commitInfo = checkNotNull(commitInfo);
+ this.commitInfo = commitInfo;
}
public boolean isExternal() {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1535500&r1=1535499&r2=1535500&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Thu Oct 24 19:06:33 2013
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
* {@link #run()} methods to be regularly executed and stopped in order to not
* execute its run method anymore.
*/
-public class ChangeProcessor implements Runnable {
+public class ChangeProcessor {
private static final Logger log =
LoggerFactory.getLogger(ChangeProcessor.class);
@@ -84,13 +84,12 @@ public class ChangeProcessor implements
private final ListenerTracker tracker;
private final EventListener listener;
- private volatile Thread running = null;
- private volatile boolean stopping = false;
- private volatile Runnable deferredUnregister;
+ /**
+ * Background thread used to wait for and deliver events.
+ */
+ private ListenerThread thread = null;
- private Registration runnable;
- private Registration mbean;
- private Listener changeListener;
+ private volatile boolean stopping = false;
public ChangeProcessor(
ContentSession contentSession, NamePathMapper namePathMapper,
@@ -118,14 +117,12 @@ public class ChangeProcessor implements
* @throws IllegalStateException if started already
*/
public synchronized void start(Whiteboard whiteboard) {
- checkState(runnable == null, "Change processor started already");
+ checkState(thread == null, "Change processor started already");
- stopping = false;
- changeListener = ((Observable) contentSession).newListener();
- runnable = WhiteboardUtils.scheduleWithFixedDelay(whiteboard, this, 1);
- mbean = WhiteboardUtils.registerMBean(
- whiteboard, EventListenerMBean.class, tracker.getListenerMBean(),
- "EventListener", tracker.toString());
+ thread = new ListenerThread(whiteboard);
+ thread.setDaemon(true);
+ thread.setPriority(Thread.MIN_PRIORITY);
+ thread.start();
}
/**
@@ -133,87 +130,63 @@ public class ChangeProcessor implements
* events will be delivered.
* @throws IllegalStateException if not yet started or stopped already
*/
- public void stop() {
- stopping = true; // do this outside synchronization
+ public synchronized void stop() {
+ checkState(thread != null, "Change processor not started");
+ checkState(!stopping, "Change processor already stopped");
- if (running == Thread.currentThread()) {
- // Defer stopping from event listener, defer unregistering until
- // event listener is done
- deferredUnregister = new Runnable() {
- @Override
- public void run() {
- unregister();
- }
- };
- } else {
- // Otherwise wait for the event listener to terminate and unregister immediately
- synchronized (this) {
- try {
- while (running != null) {
- wait();
- }
- unregister();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
+ stopping = true;
}
- private void unregister() {
- checkState(runnable != null, "Change processor not started");
- mbean.unregister();
- runnable.unregister();
- changeListener.dispose();
+ private static ImmutableTree getTree(NodeState beforeState, String path) {
+ return new ImmutableRoot(beforeState).getTree(path);
}
- @Override
- public void run() {
- // guarantee that only one thread is processing changes at a time
- synchronized (this) {
- if (running != null) {
- return;
- } else {
- running = Thread.currentThread();
- }
+ //------------------------------------------------------------< private >---
+
+ private class ListenerThread extends Thread {
+
+ private final Listener changeListener =
+ ((Observable) contentSession).newListener();
+
+ private final Registration mbean;
+
+ ListenerThread(Whiteboard whiteboard) {
+ mbean = WhiteboardUtils.registerMBean(
+ whiteboard, EventListenerMBean.class,
+ tracker.getListenerMBean(), "EventListener",
+ tracker.toString());
}
- try {
- ChangeSet changes = changeListener.getChanges();
- while (!stopping && changes != null) {
- EventFilter filter = filterRef.get();
- // FIXME don't rely on toString for session id
- if (!(filter.excludeLocal() && changes.isLocal(contentSession.toString()))) {
- String path = namePathMapper.getOakPath(filter.getPath());
- ImmutableTree beforeTree = getTree(changes.getBeforeState(), path);
- ImmutableTree afterTree = getTree(changes.getAfterState(), path);
- EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(
- changes.getCommitInfo(), beforeTree, afterTree);
- SecureNodeStateDiff.compare(VisibleDiff.wrap(diff), beforeTree, afterTree);
- if (!stopping) {
- diff.sendEvents();
+ @Override
+ public void run() {
+ try {
+ ChangeSet changes = changeListener.getChanges(100);
+ while (!stopping) {
+ EventFilter filter = filterRef.get();
+ // FIXME don't rely on toString for session id
+ if (!(filter.excludeLocal() && changes.isLocal(contentSession.toString()))) {
+ String path = namePathMapper.getOakPath(filter.getPath());
+ ImmutableTree beforeTree = getTree(changes.getBeforeState(), path);
+ ImmutableTree afterTree = getTree(changes.getAfterState(), path);
+ EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(
+ changes.getCommitInfo(), beforeTree, afterTree);
+ SecureNodeStateDiff.compare(VisibleDiff.wrap(diff), beforeTree, afterTree);
+ if (!stopping) {
+ diff.sendEvents();
+ }
}
+ changes = changeListener.getChanges(100);
}
- changes = changeListener.getChanges();
- }
- } catch (Exception e) {
- log.debug("Error while dispatching observation events", e);
- } finally {
- running = null;
- synchronized (this) { notifyAll(); }
- if (deferredUnregister != null) {
- deferredUnregister.run();
+ } catch (Exception e) {
+ log.debug("Error while dispatching observation events", e);
+ } finally {
+ mbean.unregister();
+ changeListener.dispose();
}
}
- }
- private static ImmutableTree getTree(NodeState beforeState, String path) {
- return new ImmutableRoot(beforeState).getTree(path);
}
- //------------------------------------------------------------< private >---
-
private class EventGeneratingNodeStateDiff extends RecursingNodeStateDiff {
public static final int EVENT_LIMIT = 8192;
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1535500&r1=1535499&r2=1535500&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Thu Oct 24 19:06:33 2013
@@ -49,7 +49,8 @@ public class CommitQueueTest {
try {
Revision before = new Revision(0, 0, store.getClusterId());
while (running.get()) {
- ChangeDispatcher.ChangeSet changes = listener.getChanges();
+ ChangeDispatcher.ChangeSet changes =
+ listener.getChanges(1000);
if (changes != null) {
MongoNodeState after = (MongoNodeState) changes.getAfterState();
Revision r = after.getRevision();
@@ -61,13 +62,6 @@ public class CommitQueueTest {
break;
}
before = r;
- } else {
- // avoid busy wait
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- // ignore
- }
}
}
} catch (Exception e) {
Re: svn commit: r1535500 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/observation/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/
Posted by Michael Dürig <md...@apache.org>.
On 24.10.13 9:06 , jukka@apache.org wrote:
> Author: jukka
> Date: Thu Oct 24 19:06:33 2013
> New Revision: 1535500
>
> URL: http://svn.apache.org/r1535500
> Log:
> OAK-1113: Immediate delivery of events from local commits
>
> Use the LinkedBlockingQueue from a separate listener thread to avoid the fixed scheduling of events
> @@ -186,9 +185,11 @@ public class ChangeDispatcher {
> }
> }
>
> - private void add(ChangeSet changeSet) {
> - for (Listener l : getListeners()) {
> - l.add(changeSet);
> + private void add(NodeState root, CommitInfo info) {
> + synchronized (listeners) {
> + for (Listener l : getListeners()) {
> + l.add(root, info);
> + }
> }
> }
I don't think we need the sync. here. add() is not called concurrently
and getListeners() returns a copy of the registered listeners.
> - private void add(ChangeSet changeSet) {
> - changeSets.add(changeSet);
> + private synchronized void add(NodeState root, CommitInfo info) {
> + if (blocked) {
> + info = null;
> + }
> + if (changeSets.offer(new ChangeSet(previousRoot, root, info))) {
> + previousRoot = root;
> + blocked = false;
> + } else {
> + blocked = true;
> + }
> }
> }
Also here add is not called concurrently, so no need to sync.
Michael