You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/11/06 16:27:05 UTC
svn commit: r1539363 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/core/
oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/
oak-core/src/main/java/or...
Author: mduerig
Date: Wed Nov 6 15:27:04 2013
New Revision: 1539363
URL: http://svn.apache.org/r1539363
Log:
OAK-1143 [scala] Repository init throws "illegal cyclic reference involving class ChangeDispatcher"
ChangeDispatcher distributes to observers now and ChangeProcessor is a regular observer
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
- copied, changed from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java
- copied, changed from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java Wed Nov 6 15:27:04 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.core;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,9 +30,9 @@ import javax.security.auth.login.LoginEx
import org.apache.jackrabbit.oak.api.AuthInfo;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.api.Root;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
import org.apache.jackrabbit.oak.spi.security.authentication.LoginContext;
@@ -118,8 +119,8 @@ class ContentSessionImpl implements Cont
}
@Override
- public Listener newListener() {
- return ((Observable) store).newListener();
+ public Closeable addObserver(Observer observer) {
+ return ((Observable) store).addObserver(observer);
}
//-----------------------------------------------------------< Closable >---
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java Wed Nov 6 15:27:04 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.kernel
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
@@ -39,9 +40,8 @@ import org.apache.jackrabbit.mk.api.Micr
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.cache.CacheStats;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
@@ -148,8 +148,8 @@ public class KernelNodeStore implements
//------------------------------------------------------------< Observable >---
@Override
- public Listener newListener() {
- return changeDispatcher.newListener();
+ public Closeable addObserver(Observer observer) {
+ return changeDispatcher.addObserver(observer);
}
//----------------------------------------------------------< NodeStore >---
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java Wed Nov 6 15:27:04 2013
@@ -26,7 +26,7 @@ import org.apache.jackrabbit.mk.api.Micr
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.AbstractNodeStoreBranch;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java Wed Nov 6 15:27:04 2013
@@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLat
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java Wed Nov 6 15:27:04 2013
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.WeakReference;
@@ -58,8 +59,8 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.plugins.mongomk.util.LoggingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
@@ -863,8 +864,8 @@ public final class MongoNodeStore
//------------------------< Observable >------------------------------------
@Override
- public ChangeDispatcher.Listener newListener() {
- return dispatcher.newListener();
+ public Closeable addObserver(Observer observer) {
+ return dispatcher.addObserver(observer);
}
//-------------------------< NodeStore >------------------------------------
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStoreBranch.java Wed Nov 6 15:27:04 2013
@@ -25,7 +25,7 @@ import org.apache.jackrabbit.mk.api.Micr
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.kernel.BlobSerializer;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.AbstractNodeStoreBranch;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Wed Nov 6 15:27:04 2013
@@ -21,10 +21,15 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.jcr.observation.EventListener;
+import com.google.common.base.Objects;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
@@ -32,8 +37,9 @@ import org.apache.jackrabbit.oak.api.Con
import org.apache.jackrabbit.oak.core.ImmutableRoot;
import org.apache.jackrabbit.oak.core.ImmutableTree;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.ChangeSet;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
@@ -43,28 +49,23 @@ import org.slf4j.LoggerFactory;
/**
* A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s
- * based on a {@link EventFilter} and delivers them to an {@link javax.jcr.observation.EventListener}.
+ * based on a {@link EventFilter} and delivers them to an {@link EventListener}.
* <p>
- * After instantiation a {@code ChangeProcessor} must be started in order for its
- * {@link ListenerThread listener thread's} run methods to be regularly
- * executed and stopped in order to not execute its run method anymore.
+ * After instantiation a {@code ChangeProcessor} must be started in order to start
+ * delivering observation events and stopped to stop doing so.
*/
-public class ChangeProcessor {
+public class ChangeProcessor implements Observer {
private static final Logger log = LoggerFactory.getLogger(ChangeProcessor.class);
private final ContentSession contentSession;
private final NamePathMapper namePathMapper;
- private final AtomicReference<EventFilter> filterRef;
-
private final ListenerTracker tracker;
private final EventListener listener;
+ private final AtomicReference<EventFilter> filterRef;
- /**
- * Background thread used to wait for and deliver events.
- */
- private ListenerThread thread = null;
-
- private volatile boolean stopping = false;
+ private Closeable observer;
+ private Registration mbean;
+ private NodeState previousRoot;
public ChangeProcessor(
ContentSession contentSession, NamePathMapper namePathMapper,
@@ -73,7 +74,7 @@ public class ChangeProcessor {
this.contentSession = contentSession;
this.namePathMapper = namePathMapper;
this.tracker = tracker;
- this.listener = tracker.getTrackedListener();
+ listener = tracker.getTrackedListener();
filterRef = new AtomicReference<EventFilter>(filter);
}
@@ -92,12 +93,11 @@ public class ChangeProcessor {
* @throws IllegalStateException if started already
*/
public synchronized void start(Whiteboard whiteboard) {
- checkState(thread == null, "Change processor started already");
+ checkState(observer == null, "Change processor started already");
+ observer = ((Observable) contentSession).addObserver(this);
+ mbean = WhiteboardUtils.registerMBean(whiteboard, EventListenerMBean.class,
+ tracker.getListenerMBean(), "EventListener", tracker.toString());
- thread = new ListenerThread(whiteboard);
- thread.setDaemon(true);
- thread.setPriority(Thread.MIN_PRIORITY);
- thread.start();
}
/**
@@ -106,71 +106,49 @@ public class ChangeProcessor {
* @throws IllegalStateException if not yet started or stopped already
*/
public synchronized void stop() {
- checkState(thread != null, "Change processor not started");
- checkState(!stopping, "Change processor already stopped");
-
- stopping = true;
- if (Thread.currentThread() != thread) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- log.warn("Interruption while waiting for the observation thread to terminate", e);
- Thread.currentThread().interrupt();
- } finally {
- thread.dispose();
- }
+ checkState(observer != null, "Change processor not started");
+ try {
+ mbean.unregister();
+ observer.close();
+ } catch (IOException e) {
+ log.error("Error while stopping change listener", e);
}
}
- //------------------------------------------------------------< private >---
-
- private class ListenerThread extends Thread {
-
- private final Listener changeListener =
- ((Observable) contentSession).newListener();
-
- private final Registration mbean;
-
- ListenerThread(Whiteboard whiteboard) {
- mbean = WhiteboardUtils.registerMBean(
- whiteboard, EventListenerMBean.class,
- tracker.getListenerMBean(), "EventListener",
- tracker.toString());
- }
-
- @Override
- public void run() {
+ @Override
+ public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+ if (previousRoot != null) {
try {
- while (!stopping) {
- ChangeSet changes = changeListener.getChanges(100);
- EventFilter filter = filterRef.get();
- // FIXME don't rely on toString for session id
- if (changes != null &&
- filter.includeSessionLocal(changes.isLocal(contentSession.toString())) &&
- filter.includeClusterExternal(changes.getCommitInfo() == null)) {
- String path = namePathMapper.getOakPath(filter.getPath());
- ImmutableTree beforeTree = getTree(changes.getBeforeState(), path);
- ImmutableTree afterTree = getTree(changes.getAfterState(), path);
- EventGenerator events = new EventGenerator(changes.getCommitInfo(),
- beforeTree, afterTree, filter, namePathMapper);
- if (events.hasNext()) {
- listener.onEvent(new EventIteratorAdapter(events));
- }
+ EventFilter filter = filterRef.get();
+ if (filter.includeSessionLocal(isLocal(info))
+ && filter.includeClusterExternal(isExternal(info))) {
+ String path = namePathMapper.getOakPath(filter.getPath());
+ ImmutableTree beforeTree = getTree(previousRoot, path);
+ ImmutableTree afterTree = getTree(root, path);
+ EventGenerator events = new EventGenerator(
+ info, beforeTree, afterTree, filter, namePathMapper);
+ if (events.hasNext()) {
+ listener.onEvent(new EventIteratorAdapter(events));
}
}
} catch (Exception e) {
log.warn("Error while dispatching observation events", e);
}
}
+ previousRoot = root;
+ }
- private ImmutableTree getTree(NodeState nodeState, String path) {
- return new ImmutableRoot(nodeState).getTree(path);
- }
+ private boolean isLocal(CommitInfo info) {
+ // FIXME don't rely on toString for session id
+ return info != null && Objects.equal(info.getSessionId(), contentSession.toString());
+ }
- void dispose() {
- mbean.unregister();
- changeListener.dispose();
- }
+ private static boolean isExternal(CommitInfo info) {
+ return info == null;
+ }
+
+ private static ImmutableTree getTree(NodeState nodeState, String path) {
+ return new ImmutableRoot(nodeState).getTree(path);
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Wed Nov 6 15:27:04 2013
@@ -21,6 +21,7 @@ import static com.google.common.base.Pre
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -30,9 +31,8 @@ import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
@@ -105,8 +105,8 @@ public class SegmentNodeStore implements
}
@Override
- public Listener newListener() {
- return changeDispatcher.newListener();
+ public Closeable addObserver(Observer observer) {
+ return changeDispatcher.addObserver(observer);
}
@Override @Nonnull
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Wed Nov 6 15:27:04 2013
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -27,7 +28,6 @@ import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
import com.mongodb.Mongo;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -36,12 +36,12 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.plugins.segment.mongo.MongoStore;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
@@ -153,9 +153,9 @@ public class SegmentNodeStoreService imp
//------------------------------------------------------------< Observable >---
@Override
- public Listener newListener() {
+ public Closeable addObserver(Observer observer) {
Preconditions.checkState(delegate instanceof Observable);
- return ((Observable) getDelegate()).newListener();
+ return ((Observable) getDelegate()).addObserver(observer);
}
Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java (from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java&r1=1539346&r2=1539363&rev=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java Wed Nov 6 15:27:04 2013
@@ -16,13 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.observation;
+package org.apache.jackrabbit.oak.spi.commit;
import static com.google.common.base.Objects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Queues.newLinkedBlockingQueue;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -32,10 +34,7 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import com.google.common.base.Objects;
import com.google.common.collect.Sets;
-
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
@@ -58,8 +57,8 @@ import org.apache.jackrabbit.oak.spi.sta
}
* </pre>
* <p>
- * The {@link #newListener()} method registers a listener for receiving changes reported
- * into a change dispatcher.
+ * The {@link #addObserver(Observer)} method registers an {@link Observer} for receiving
+ * notifications about all changes reported to this instance.
*/
public class ChangeDispatcher implements Observable {
private final Set<Listener> listeners = Sets.newHashSet();
@@ -78,15 +77,18 @@ public class ChangeDispatcher implements
}
/**
- * Create a new {@link Listener} for receiving changes reported into
- * this change dispatcher. Listeners need to be {@link Listener#dispose() disposed}
- * when no longer needed.
- * @return a new {@code Listener} instance.
+ * Register a new {@link Observer} for receiving notifications about changes reported to
+ * this change dispatcher. Changes are reported asynchronously. Clients need to
+ * call {@link java.io.Closeable#close()} close} on the returned {@code Closeable} instance
+ * to stop receiving notifications.
+ *
+ * @return a {@link Closeable} instance
*/
@Override
@Nonnull
- public Listener newListener() {
- Listener listener = new Listener(root);
+ public Closeable addObserver(Observer observer) {
+ Listener listener = new Listener(observer, root);
+ listener.start();
register(listener);
return listener;
}
@@ -187,10 +189,8 @@ public class ChangeDispatcher implements
}
private void add(NodeState root, CommitInfo info) {
- synchronized (listeners) {
- for (Listener l : getListeners()) {
- l.add(root, info);
- }
+ for (Listener l : getListeners()) {
+ l.contentChanged(root, info);
}
}
@@ -203,123 +203,92 @@ public class ChangeDispatcher implements
//------------------------------------------------------------< Listener >---
/**
- * Listener for receiving changes reported into a change dispatcher by
- * any of its hooks.
+ * Listener thread receiving changes reported into {@code ChangeDispatcher} and
+ * asynchronously distributing these to an associated {@link Observer}.
*/
- public class Listener {
-
- private final LinkedBlockingQueue<ChangeSet> changeSets =
- newLinkedBlockingQueue();
-
- private NodeState previousRoot;
+ private class Listener extends Thread implements Closeable, Observer {
+ private final LinkedBlockingQueue<Commit> commits = newLinkedBlockingQueue();
+ private final Observer observer;
private boolean blocked = false;
+ private volatile boolean stopping;
- Listener(NodeState root) {
- this.previousRoot = checkNotNull(root);
+ Listener(Observer observer, NodeState root) {
+ this.observer = checkNotNull(observer);
+ commits.add(new Commit(root, null));
+ setDaemon(true);
+ setPriority(Thread.MIN_PRIORITY);
}
- /**
- * Free up any resources associated by this hook.
- */
- public void dispose() {
- unregister(this);
+ @Override
+ public void contentChanged(NodeState root, CommitInfo info) {
+ Commit commit = new Commit(root, blocked ? null : info);
+ blocked = !commits.offer(commit);
}
- /**
- * Poll for changes reported to this listener.
- *
- * @param timeout maximum number of milliseconds to wait for changes
- * @return {@code ChangeSet} of the changes, or {@code null} if
- * no changes occurred since the last call to this method
- * and before the timeout
- * @throws InterruptedException if polling was interrupted
- */
- @CheckForNull
- public ChangeSet getChanges(long timeout) throws InterruptedException {
- if (changeSets.isEmpty()) {
- externalChange();
+ @Override
+ public void run() {
+ try {
+ while (!stopping) {
+ if (commits.isEmpty()) {
+ externalChange();
+ }
+ Commit commit = commits.poll(100, TimeUnit.MILLISECONDS);
+ if (commit != null) {
+ observer.contentChanged(commit.getRoot(), commit.getCommitInfo());
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- return changeSets.poll(timeout, TimeUnit.MILLISECONDS);
}
- private synchronized void add(NodeState root, CommitInfo info) {
- if (blocked) {
- info = null;
- }
- if (changeSets.offer(new ChangeSet(previousRoot, root, info))) {
- previousRoot = root;
- blocked = false;
- } else {
- blocked = true;
+ @Override
+ public void close() throws IOException {
+ checkState(!stopping, "Change processor already stopped");
+
+ unregister(this);
+ stopping = true;
+ if (Thread.currentThread() != this) {
+ try {
+ join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException
+ ("Interruption while waiting for the listener thread to terminate", e);
+ }
}
}
}
- //------------------------------------------------------------< ChangeSet >---
+ //------------------------------------------------------------< Commit >---
- /**
- * Instances of this class represent changes to a node store. In addition they
- * record meta data associated with such changes like whether a change occurred
- * on the local cluster node, the user causing the changes and the date the changes
- * where persisted.
- */
- public static class ChangeSet {
- private final NodeState before;
- private final NodeState after;
+ private static class Commit {
+ private final NodeState root;
private final CommitInfo commitInfo;
- /**
- * Creates a change set
- */
- ChangeSet(@Nonnull NodeState before, @Nonnull NodeState after,
- @Nullable CommitInfo commitInfo) {
- this.before = checkNotNull(before);
- this.after = checkNotNull(after);
+ Commit(@Nonnull NodeState root, @Nullable CommitInfo commitInfo) {
+ this.root = checkNotNull(root);
this.commitInfo = commitInfo;
}
- public boolean isExternal() {
- return commitInfo == null;
- }
-
- public boolean isLocal(String sessionId) {
- return commitInfo != null
- && Objects.equal(commitInfo.getSessionId(), sessionId);
+ @Nonnull
+ NodeState getRoot() {
+ return root;
}
@CheckForNull
- public CommitInfo getCommitInfo() {
+ CommitInfo getCommitInfo() {
return commitInfo;
}
- /**
- * State before the change
- * @return before state
- */
- @Nonnull
- public NodeState getBeforeState() {
- return before;
- }
-
- /**
- * State after the change
- * @return after state
- */
- @Nonnull
- public NodeState getAfterState() {
- return after;
- }
-
@Override
public String toString() {
return toStringHelper(this)
- .add("base", before)
- .add("head", after)
+ .add("root", root)
.add("commit info", commitInfo)
.toString();
}
-
}
}
Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java (from r1539346, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java&r1=1539346&r2=1539363&rev=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/Observable.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/Observable.java Wed Nov 6 15:27:04 2013
@@ -17,23 +17,24 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.observation;
+package org.apache.jackrabbit.oak.spi.commit;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
+import java.io.Closeable;
/**
- * An {@code Observable} supports attaching {@link Listener} instances for
+ * An {@code Observable} supports attaching {@link Observer} instances for
* listening to content changes.
*
- * @see ChangeDispatcher
+ * @see Observable
*/
public interface Observable {
/**
- * Register a new {@code Listener}. Clients need to call
- * {@link ChangeDispatcher.Listener#dispose()} to free
- * up any resources associated with the listener when done.
- * @return a fresh {@code Listener} instance.
+ * Register a new {@code Observer}. Clients need to call {@link Closeable#close()}
+ * to stop getting notifications on the registered observer and to free up any resources
+ * associated with the registration.
+ *
+ * @return a {@code Closeable} instance.
*/
- Listener newListener();
+ Closeable addObserver(Observer observer);
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java Wed Nov 6 15:27:04 2013
@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.PathUtils;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Wed Nov 6 15:27:04 2013
@@ -16,13 +16,20 @@
*/
package org.apache.jackrabbit.oak.plugins.mongomk;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.junit.Test;
/**
@@ -38,38 +45,26 @@ public class CommitQueueTest {
public void concurrentCommits() throws Exception {
final MongoNodeStore store = new MongoMK.Builder().getNodeStore();
ChangeDispatcher dispatcher = new ChangeDispatcher(store);
- final ChangeDispatcher.Listener listener = dispatcher.newListener();
- final AtomicBoolean running = new AtomicBoolean(true);
+ AtomicBoolean running = new AtomicBoolean(true);
final CommitQueue queue = new CommitQueue(store, dispatcher);
- final List<Exception> exceptions = Collections.synchronizedList(
- new ArrayList<Exception>());
- Thread reader = new Thread(new Runnable() {
+ final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
+
+ Closeable observer = dispatcher.addObserver(new Observer() {
+ private Revision before = new Revision(0, 0, store.getClusterId());
+
@Override
- public void run() {
- try {
- Revision before = new Revision(0, 0, store.getClusterId());
- while (running.get()) {
- ChangeDispatcher.ChangeSet changes =
- listener.getChanges(1000);
- if (changes != null) {
- MongoNodeState after = (MongoNodeState) changes.getAfterState();
- Revision r = after.getRevision();
- // System.out.println("seen: " + r);
- if (r.compareRevisionTime(before) < 1) {
- exceptions.add(new Exception(
- "Inconsistent revision sequence. Before: " +
- before + ", after: " + r));
- break;
- }
- before = r;
- }
- }
- } catch (Exception e) {
- exceptions.add(e);
+ public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+ MongoNodeState after = (MongoNodeState) root;
+ Revision r = after.getRevision();
+// System.out.println("seen: " + r);
+ if (r.compareRevisionTime(before) < 1) {
+ exceptions.add(new Exception(
+ "Inconsistent revision sequence. Before: " +
+ before + ", after: " + r));
}
+ before = r;
}
});
- reader.start();
// perform commits with multiple threads
List<Thread> writers = new ArrayList<Thread>();
@@ -108,7 +103,7 @@ public class CommitQueueTest {
t.join();
}
running.set(false);
- reader.join();
+ observer.close();
for (Exception e : exceptions) {
throw e;
}
Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java Wed Nov 6 15:27:04 2013
@@ -43,7 +43,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.observation.ChangeProcessor;
import org.apache.jackrabbit.oak.plugins.observation.EventFilter;
import org.apache.jackrabbit.oak.plugins.observation.ExcludeExternal;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java?rev=1539363&r1=1539362&r2=1539363&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java Wed Nov 6 15:27:04 2013
@@ -16,11 +16,16 @@
*/
package org.apache.jackrabbit.oak.jcr.session;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.newTreeSet;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.jcr.PathNotFoundException;
@@ -52,8 +57,8 @@ import org.apache.jackrabbit.oak.namepat
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
import org.apache.jackrabbit.oak.namepath.NamePathMapperImpl;
import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
-import org.apache.jackrabbit.oak.plugins.observation.Observable;
import org.apache.jackrabbit.oak.plugins.value.ValueFactoryImpl;
+import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration;
import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -66,10 +71,6 @@ import org.apache.jackrabbit.oak.spi.xml
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Sets.newHashSet;
-import static com.google.common.collect.Sets.newTreeSet;
-
/**
* Instances of this class are passed to all JCR implementation classes
* (e.g. {@code SessionImpl}, {@code NodeImpl}, etc.) and provide access to