You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/10/29 09:16:58 UTC
svn commit: r1536603 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
Author: mduerig
Date: Tue Oct 29 08:16:58 2013
New Revision: 1536603
URL: http://svn.apache.org/r1536603
Log:
OAK-144 Implement observation
The EventIterator passed to onEvent now generates the events as needed (i.e. lazily during iteration).
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1536603&r1=1536602&r2=1536603&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Tue Oct 29 08:16:58 2013
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterators.concat;
import static com.google.common.collect.Lists.newArrayList;
import static javax.jcr.observation.Event.NODE_ADDED;
import static javax.jcr.observation.Event.NODE_REMOVED;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.Atomi
import javax.jcr.observation.Event;
import javax.jcr.observation.EventListener;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
@@ -48,7 +51,6 @@ import org.apache.jackrabbit.oak.namepat
import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.ChangeSet;
import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.DefaultValidator;
import org.apache.jackrabbit.oak.spi.commit.Validator;
import org.apache.jackrabbit.oak.spi.commit.VisibleValidator;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -131,10 +133,6 @@ public class ChangeProcessor {
stopping = true;
}
- private static ImmutableTree getTree(NodeState beforeState, String path) {
- return new ImmutableRoot(beforeState).getTree(path);
- }
-
//------------------------------------------------------------< private >---
private class ListenerThread extends Thread {
@@ -165,8 +163,9 @@ public class ChangeProcessor {
ImmutableTree afterTree = getTree(changes.getAfterState(), path);
EventGeneratingValidator events = new EventGeneratingValidator(
changes.getCommitInfo(), beforeTree, afterTree);
- VisibleValidator visibleEvents = new VisibleValidator(events, true, true);
- SecureValidator.compare(beforeTree, afterTree, visibleEvents);
+ if (events.hasNext()) {
+ listener.onEvent(new EventIteratorAdapter(events));
+ }
}
changes = changeListener.getChanges(100);
}
@@ -177,22 +176,26 @@ public class ChangeProcessor {
changeListener.dispose();
}
}
- }
- private class EventGeneratingValidator extends DefaultValidator {
- public static final int EVENT_LIMIT = 8192;
+ private ImmutableTree getTree(NodeState nodeState, String path) {
+ return new ImmutableRoot(nodeState).getTree(path);
+ }
+ }
+ private class EventGeneratingValidator extends ForwardingIterator<Event> implements Validator {
private final String userId;
private final String message;
private final long timestamp;
private final boolean external;
- private final Tree beforeTree;
- private final Tree afterTree;
- private final List<Event> events;
- private final boolean isRoot;
+ private final ImmutableTree beforeTree;
+ private final ImmutableTree afterTree;
+ private final List<Event> events = newArrayList();
+ private final List<Iterator<Event>> childEvents = newArrayList();
+
+ private Iterator<Event> eventIterator;
- EventGeneratingValidator(CommitInfo info, Tree beforeTree, Tree afterTree) {
+ EventGeneratingValidator(CommitInfo info, ImmutableTree beforeTree, ImmutableTree afterTree) {
if (info != null) {
this.userId = info.getUserId();
this.message = info.getMessage();
@@ -208,8 +211,6 @@ public class ChangeProcessor {
}
this.beforeTree = beforeTree;
this.afterTree = afterTree;
- this.events = newArrayList();
- isRoot = true;
}
private EventGeneratingValidator(EventGeneratingValidator parent, String name) {
@@ -219,92 +220,89 @@ public class ChangeProcessor {
this.external = parent.external;
this.beforeTree = parent.beforeTree.getChild(name);
this.afterTree = parent.afterTree.getChild(name);
- this.events = parent.events;
- isRoot = false;
}
+ //------------------------------------------------------------< ForwardingIterator >---
+
@Override
- public void leave(NodeState before, NodeState after) throws CommitFailedException {
- // TODO instead of putting events in a list generate them on demand
- // on calls to iterator.next()
- if (isRoot || events.size() > EVENT_LIMIT) {
- Iterator<Event> eventIterator = newArrayList(events.iterator()).iterator();
- events.clear();
- if (eventIterator.hasNext()) {
- try {
- listener.onEvent(new EventIteratorAdapter(eventIterator) {
- @Override
- public boolean hasNext() {
- return !stopping && super.hasNext();
- }
- });
- }
- catch (Exception e) {
- log.warn("Unhandled exception in observation listener: " + listener, e);
- }
+ protected Iterator<Event> delegate() {
+ try {
+ if (eventIterator == null) {
+ SecureValidator.compare(beforeTree, afterTree,
+ new VisibleValidator(this, true, true));
+ eventIterator = concat(events.iterator(), concat(childEvents.iterator()));
}
+ return eventIterator;
+ } catch (CommitFailedException e) {
+ log.error("Error while extracting observation events", e);
+ return Iterators.emptyIterator();
}
}
+ //------------------------------------------------------------< Validator >---
+
+ @Override
+ public void enter(NodeState before, NodeState after) throws CommitFailedException {
+ }
+
+ @Override
+ public void leave(NodeState before, NodeState after) throws CommitFailedException {
+ }
+
@Override
public void propertyAdded(PropertyState after) {
- if (!stopping && filterRef.get().include(PROPERTY_ADDED, afterTree)) {
+ if (filterRef.get().include(PROPERTY_ADDED, afterTree)) {
events.add(createEvent(PROPERTY_ADDED, afterTree, after));
}
}
@Override
public void propertyChanged(PropertyState before, PropertyState after) {
- if (!stopping && filterRef.get().include(Event.PROPERTY_CHANGED, afterTree)) {
+ if (filterRef.get().include(Event.PROPERTY_CHANGED, afterTree)) {
events.add(createEvent(Event.PROPERTY_CHANGED, afterTree, after));
}
}
@Override
public void propertyDeleted(PropertyState before) {
- if (!stopping && filterRef.get().include(PROPERTY_REMOVED, afterTree)) {
+ if (filterRef.get().include(PROPERTY_REMOVED, afterTree)) {
events.add(createEvent(PROPERTY_REMOVED, beforeTree, before));
}
}
@Override
public Validator childNodeAdded(String name, NodeState after) {
- if (stopping) {
- return null;
- }
EventFilter eventFilter = filterRef.get();
if (eventFilter.include(NODE_ADDED, afterTree)) {
events.add(createEvent(NODE_ADDED, afterTree.getChild(name)));
}
- return createChildValidator(eventFilter, afterTree.getPath(), name);
+ if (eventFilter.includeChildren(afterTree.getPath())) {
+ childEvents.add(new EventGeneratingValidator(this, name));
+ }
+ return null;
}
@Override
public Validator childNodeDeleted(String name, NodeState before) {
- if (stopping) {
- return null;
- }
EventFilter eventFilter = filterRef.get();
if (eventFilter.include(NODE_REMOVED, beforeTree)) {
events.add(createEvent(NODE_REMOVED, beforeTree.getChild(name)));
}
- return createChildValidator(eventFilter, beforeTree.getPath(), name);
+ if (eventFilter.includeChildren(beforeTree.getPath())) {
+ childEvents.add(new EventGeneratingValidator(this, name));
+ }
+ return null;
}
@Override
public Validator childNodeChanged(String name, NodeState before, NodeState after) {
- if (stopping) {
- return null;
+ if (filterRef.get().includeChildren(afterTree.getPath())) {
+ childEvents.add(new EventGeneratingValidator(this, name));
}
- return createChildValidator(filterRef.get(), afterTree.getPath(), name);
+ return null;
}
- private Validator createChildValidator(EventFilter eventFilter, String parentPath,
- String name) {
- return eventFilter.includeChildren(parentPath)
- ? new EventGeneratingValidator(this, name)
- : null;
- }
+ //------------------------------------------------------------< internal >---
private Event createEvent(int eventType, Tree tree) {
return createEvent(eventType, tree.getPath(), getIdentifier(tree));