You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/10/29 09:16:58 UTC

svn commit: r1536603 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java

Author: mduerig
Date: Tue Oct 29 08:16:58 2013
New Revision: 1536603

URL: http://svn.apache.org/r1536603
Log:
OAK-144 Implement observation
The EventIterator passed to onEvent now generates the events as needed (i.e. lazily during iteration).

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1536603&r1=1536602&r2=1536603&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Tue Oct 29 08:16:58 2013
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterators.concat;
 import static com.google.common.collect.Lists.newArrayList;
 import static javax.jcr.observation.Event.NODE_ADDED;
 import static javax.jcr.observation.Event.NODE_REMOVED;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.Atomi
 import javax.jcr.observation.Event;
 import javax.jcr.observation.EventListener;
 
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
 import org.apache.jackrabbit.api.jmx.EventListenerMBean;
 import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter;
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
@@ -48,7 +51,6 @@ import org.apache.jackrabbit.oak.namepat
 import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.ChangeSet;
 import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.DefaultValidator;
 import org.apache.jackrabbit.oak.spi.commit.Validator;
 import org.apache.jackrabbit.oak.spi.commit.VisibleValidator;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -131,10 +133,6 @@ public class ChangeProcessor {
         stopping = true;
     }
 
-    private static ImmutableTree getTree(NodeState beforeState, String path) {
-        return new ImmutableRoot(beforeState).getTree(path);
-    }
-
     //------------------------------------------------------------< private >---
 
     private class ListenerThread extends Thread {
@@ -165,8 +163,9 @@ public class ChangeProcessor {
                         ImmutableTree afterTree = getTree(changes.getAfterState(), path);
                         EventGeneratingValidator events = new EventGeneratingValidator(
                                 changes.getCommitInfo(), beforeTree, afterTree);
-                        VisibleValidator visibleEvents = new VisibleValidator(events, true, true);
-                        SecureValidator.compare(beforeTree, afterTree, visibleEvents);
+                        if (events.hasNext()) {
+                            listener.onEvent(new EventIteratorAdapter(events));
+                        }
                     }
                     changes = changeListener.getChanges(100);
                 }
@@ -177,22 +176,26 @@ public class ChangeProcessor {
                 changeListener.dispose();
             }
         }
-    }
 
-    private class EventGeneratingValidator extends DefaultValidator {
-        public static final int EVENT_LIMIT = 8192;
+        private ImmutableTree getTree(NodeState nodeState, String path) {
+            return new ImmutableRoot(nodeState).getTree(path);
+        }
+    }
 
+    private class EventGeneratingValidator extends ForwardingIterator<Event> implements Validator {
         private final String userId;
         private final String message;
         private final long timestamp;
         private final boolean external;
 
-        private final Tree beforeTree;
-        private final Tree afterTree;
-        private final List<Event> events;
-        private final boolean isRoot;
+        private final ImmutableTree beforeTree;
+        private final ImmutableTree afterTree;
+        private final List<Event> events = newArrayList();
+        private final List<Iterator<Event>> childEvents = newArrayList();
+
+        private Iterator<Event> eventIterator;
 
-        EventGeneratingValidator(CommitInfo info, Tree beforeTree, Tree afterTree) {
+        EventGeneratingValidator(CommitInfo info, ImmutableTree beforeTree, ImmutableTree afterTree) {
             if (info != null) {
                 this.userId = info.getUserId();
                 this.message = info.getMessage();
@@ -208,8 +211,6 @@ public class ChangeProcessor {
             }
             this.beforeTree = beforeTree;
             this.afterTree = afterTree;
-            this.events = newArrayList();
-            isRoot = true;
         }
 
         private EventGeneratingValidator(EventGeneratingValidator parent, String name) {
@@ -219,92 +220,89 @@ public class ChangeProcessor {
             this.external = parent.external;
             this.beforeTree = parent.beforeTree.getChild(name);
             this.afterTree = parent.afterTree.getChild(name);
-            this.events = parent.events;
-            isRoot = false;
         }
 
+        //------------------------------------------------------------< ForwardingIterator >---
+
         @Override
-        public void leave(NodeState before, NodeState after) throws CommitFailedException {
-            // TODO instead of putting events in a list generate them on demand
-            // on calls to iterator.next()
-            if (isRoot || events.size() > EVENT_LIMIT) {
-                Iterator<Event> eventIterator = newArrayList(events.iterator()).iterator();
-                events.clear();
-                if (eventIterator.hasNext()) {
-                    try {
-                        listener.onEvent(new EventIteratorAdapter(eventIterator) {
-                            @Override
-                            public boolean hasNext() {
-                                return !stopping && super.hasNext();
-                            }
-                        });
-                    }
-                    catch (Exception e) {
-                        log.warn("Unhandled exception in observation listener: " + listener, e);
-                    }
+        protected Iterator<Event> delegate() {
+            try {
+                if (eventIterator == null) {
+                    SecureValidator.compare(beforeTree, afterTree,
+                            new VisibleValidator(this, true, true));
+                    eventIterator = concat(events.iterator(), concat(childEvents.iterator()));
                 }
+                return eventIterator;
+            } catch (CommitFailedException e) {
+                log.error("Error while extracting observation events", e);
+                return Iterators.emptyIterator();
             }
         }
 
+        //------------------------------------------------------------< Validator >---
+
+        @Override
+        public void enter(NodeState before, NodeState after) throws CommitFailedException {
+        }
+
+        @Override
+        public void leave(NodeState before, NodeState after) throws CommitFailedException {
+        }
+
         @Override
         public void propertyAdded(PropertyState after) {
-            if (!stopping && filterRef.get().include(PROPERTY_ADDED, afterTree)) {
+            if (filterRef.get().include(PROPERTY_ADDED, afterTree)) {
                 events.add(createEvent(PROPERTY_ADDED, afterTree, after));
             }
         }
 
         @Override
         public void propertyChanged(PropertyState before, PropertyState after) {
-            if (!stopping && filterRef.get().include(Event.PROPERTY_CHANGED, afterTree)) {
+            if (filterRef.get().include(Event.PROPERTY_CHANGED, afterTree)) {
                 events.add(createEvent(Event.PROPERTY_CHANGED, afterTree, after));
             }
         }
 
         @Override
         public void propertyDeleted(PropertyState before) {
-            if (!stopping && filterRef.get().include(PROPERTY_REMOVED, afterTree)) {
+            if (filterRef.get().include(PROPERTY_REMOVED, afterTree)) {
                 events.add(createEvent(PROPERTY_REMOVED, beforeTree, before));
             }
         }
 
         @Override
         public Validator childNodeAdded(String name, NodeState after) {
-            if (stopping) {
-                return null;
-            }
             EventFilter eventFilter = filterRef.get();
             if (eventFilter.include(NODE_ADDED, afterTree)) {
                 events.add(createEvent(NODE_ADDED, afterTree.getChild(name)));
             }
-            return createChildValidator(eventFilter, afterTree.getPath(), name);
+            if (eventFilter.includeChildren(afterTree.getPath())) {
+                childEvents.add(new EventGeneratingValidator(this, name));
+            }
+            return null;
         }
 
         @Override
         public Validator childNodeDeleted(String name, NodeState before) {
-            if (stopping) {
-                return null;
-            }
             EventFilter eventFilter = filterRef.get();
             if (eventFilter.include(NODE_REMOVED, beforeTree)) {
                 events.add(createEvent(NODE_REMOVED, beforeTree.getChild(name)));
             }
-            return createChildValidator(eventFilter, beforeTree.getPath(), name);
+            if (eventFilter.includeChildren(beforeTree.getPath())) {
+                childEvents.add(new EventGeneratingValidator(this, name));
+            }
+            return null;
         }
 
         @Override
         public Validator childNodeChanged(String name, NodeState before, NodeState after) {
-            if (stopping) {
-                return null;
+            if (filterRef.get().includeChildren(afterTree.getPath())) {
+                childEvents.add(new EventGeneratingValidator(this, name));
             }
-            return createChildValidator(filterRef.get(), afterTree.getPath(), name);
+            return null;
         }
 
-        private Validator createChildValidator(EventFilter eventFilter, String parentPath,
-                String name) {
-            return eventFilter.includeChildren(parentPath)
-                    ? new EventGeneratingValidator(this, name)
-                    : null;
-        }
+        //------------------------------------------------------------< internal >---
 
         private Event createEvent(int eventType, Tree tree) {
             return createEvent(eventType, tree.getPath(), getIdentifier(tree));