You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2012/06/28 11:19:51 UTC

svn commit: r1354877 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/util/ oak-jcr/ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/

Author: mduerig
Date: Thu Jun 28 09:19:50 2012
New Revision: 1354877

URL: http://svn.apache.org/viewvc?rev=1354877&view=rev
Log:
OAK-144: Implement observation
batch up observation events to mitigate effects from JCR-3361

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java
    jackrabbit/oak/trunk/oak-jcr/pom.xml
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java Thu Jun 28 09:19:50 2012
@@ -203,7 +203,7 @@ public final class Iterators {
      * @return
      */
     @Nonnull
-    public static <T> Iterator<T> flatten(final Iterator<Iterator<? extends T>> iterators) {
+    public static <T> Iterator<T> flatten(final Iterator<? extends Iterator<? extends T>> iterators) {
         return new Iterator<T>() {
             private Iterator<? extends T> current;
 

Modified: jackrabbit/oak/trunk/oak-jcr/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/pom.xml?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-jcr/pom.xml Thu Jun 28 09:19:50 2012
@@ -86,7 +86,7 @@ org.apache.jackrabbit.test.api.query.XPa
 org.apache.jackrabbit.test.api.query.qom.ColumnTest#testExpandColumnsForNodeType
 org.apache.jackrabbit.test.api.query.qom.SelectorTest#testUnknownNodeType
 org.apache.jackrabbit.test.api.util
-org.apache.jackrabbit.test.api.observation.EventIteratorTest#testGetPosition  <!-- JCR-3361 -->
+org.apache.jackrabbit.test.api.observation.EventIteratorTest#testSkip
 org.apache.jackrabbit.test.api.observation.EventTest#testGetType
 org.apache.jackrabbit.test.api.observation.EventTest#testGetNodePath
 org.apache.jackrabbit.test.api.observation.EventTest#testGetUserId
@@ -94,8 +94,8 @@ org.apache.jackrabbit.test.api.observati
 org.apache.jackrabbit.test.api.observation.NodeAddedTest#testMultipleNodeAdded1
 org.apache.jackrabbit.test.api.observation.NodeAddedTest#testMultipleNodeAdded2
 org.apache.jackrabbit.test.api.observation.NodeAddedTest#testTransientNodeAddedRemoved
-org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testSingleNodeRemoved  <!-- JCR-3361 -->
-org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testMultiNodesRemoved  <!-- JCR-3361 -->
+org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testSingleNodeRemoved
+org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testMultiNodesRemoved
 org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveNode
 org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveTree
 org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveWithRemove
@@ -103,12 +103,9 @@ org.apache.jackrabbit.test.api.observati
 org.apache.jackrabbit.test.api.observation.NodeReorderTest#testNodeReorderMove       <!-- reorder not supported -->
 org.apache.jackrabbit.test.api.observation.NodeReorderTest#testNodeReorderSameName   <!-- reorder not supported -->
 org.apache.jackrabbit.test.api.observation.NodeReorderTest#testNodeReorderSameNameWithRemove  <!-- reorder not supported -->
-org.apache.jackrabbit.test.api.observation.PropertyAddedTest#testMultiPropertyAdded  <!-- JCR-3361 -->
-org.apache.jackrabbit.test.api.observation.PropertyChangedTest#testMultiPropertyChanged       <!-- JCR-3361 -->
 org.apache.jackrabbit.test.api.observation.PropertyChangedTest#testSinglePropertyChangedWithAdded
-org.apache.jackrabbit.test.api.observation.PropertyRemovedTest#testMultiPropertyRemoved       <!-- JCR-3361 -->
 org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testPath
-org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testNodeType                  <!-- JCR-3361 -->
+org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testNodeType
 org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testIsDeepFalseNodeAdded
 org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testIsDeepFalsePropertyAdded
 org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testNoLocalTrue

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Thu Jun 28 09:19:50 2012
@@ -16,8 +16,9 @@
  */
 package org.apache.jackrabbit.oak.jcr.observation;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -65,27 +66,42 @@ class ChangeProcessor extends TimerTask 
 
     @Override
     public void run() {
-        changeExtractor.getChanges(new EventGeneratingNodeStateDiff());
+        EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff();
+        changeExtractor.getChanges(diff);
+        diff.sendEvents();
     }
 
     //------------------------------------------------------------< private >---
 
     private class EventGeneratingNodeStateDiff implements NodeStateDiff {
+        public static final int PURGE_LIMIT = 8192;
+
         private final String path;
 
-        EventGeneratingNodeStateDiff(String path) {
+        private int childNodeCount;
+        private List<Iterator<Event>> events;
+
+        EventGeneratingNodeStateDiff(String path, List<Iterator<Event>> events) {
             this.path = path;
+            this.events = events;
         }
 
         public EventGeneratingNodeStateDiff() {
-            this("/");
+            this("/", new ArrayList<Iterator<Event>>(PURGE_LIMIT));
+        }
+
+        public void sendEvents() {
+            if (!events.isEmpty()) {
+                listener.onEvent(new EventIteratorAdapter(Iterators.flatten(events.iterator())));
+                events = new ArrayList<Iterator<Event>>(PURGE_LIMIT);
+            }
         }
 
         @Override
         public void propertyAdded(PropertyState after) {
             if (!stopped && filterRef.get().include(Event.PROPERTY_ADDED, path, after)) {
                 Event event = generatePropertyEvent(Event.PROPERTY_ADDED, path, after);
-                listener.onEvent(new EventIteratorAdapter(Collections.singleton(event)));
+                events.add(Iterators.singleton(event));
             }
         }
 
@@ -93,7 +109,7 @@ class ChangeProcessor extends TimerTask 
         public void propertyChanged(PropertyState before, PropertyState after) {
             if (!stopped && filterRef.get().include(Event.PROPERTY_CHANGED, path, before)) {
                 Event event = generatePropertyEvent(Event.PROPERTY_CHANGED, path, after);
-                listener.onEvent(new EventIteratorAdapter(Collections.singleton(event)));
+                events.add(Iterators.singleton(event));
             }
         }
 
@@ -101,7 +117,7 @@ class ChangeProcessor extends TimerTask 
         public void propertyDeleted(PropertyState before) {
             if (!stopped && filterRef.get().include(Event.PROPERTY_REMOVED, path, before)) {
                 Event event = generatePropertyEvent(Event.PROPERTY_REMOVED, path, before);
-                listener.onEvent(new EventIteratorAdapter(Collections.singleton(event)));
+                events.add(Iterators.singleton(event));
             }
         }
 
@@ -109,7 +125,10 @@ class ChangeProcessor extends TimerTask 
         public void childNodeAdded(String name, NodeState after) {
             if (!stopped && filterRef.get().include(Event.NODE_ADDED, path, after)) {
                 Iterator<Event> events = generateNodeEvents(Event.NODE_ADDED, path, name, after);
-                listener.onEvent(new EventIteratorAdapter(events));
+                this.events.add(events);
+                if (++childNodeCount > PURGE_LIMIT) {
+                    sendEvents();
+                }
             }
         }
 
@@ -117,15 +136,18 @@ class ChangeProcessor extends TimerTask 
         public void childNodeDeleted(String name, NodeState before) {
             if (!stopped && filterRef.get().include(Event.NODE_REMOVED, path, before)) {
                 Iterator<Event> events = generateNodeEvents(Event.NODE_REMOVED, path, name, before);
-                listener.onEvent(new EventIteratorAdapter(events));
+                this.events.add(events);
             }
         }
 
         @Override
         public void childNodeChanged(String name, NodeState before, NodeState after) {
             if (!stopped && filterRef.get().includeChildren(path)) {
-                changeExtractor.getChanges(before, after,
-                        new EventGeneratingNodeStateDiff(PathUtils.concat(path, name)));
+                EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(PathUtils.concat(path, name), events);
+                changeExtractor.getChanges(before, after, diff);
+                if (events.size() > PURGE_LIMIT) {
+                    diff.sendEvents();
+                }
             }
         }
     }

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java Thu Jun 28 09:19:50 2012
@@ -55,6 +55,7 @@ public class ObservationManagerImpl impl
             boolean isDeep, String[] uuid, String[] nodeTypeName, boolean noLocal)
             throws RepositoryException {
 
+        // TODO: support noLocal flag!?
         ChangeProcessor processor = processors.get(listener);
         if (processor == null) {
             ChangeExtractor extractor = sessionDelegate.getChangeExtractor();