You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2012/06/28 11:19:51 UTC
svn commit: r1354877 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/util/ oak-jcr/
oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/
Author: mduerig
Date: Thu Jun 28 09:19:50 2012
New Revision: 1354877
URL: http://svn.apache.org/viewvc?rev=1354877&view=rev
Log:
OAK-144: Implement observation
batch up observation events to mitigate effects from JCR-3361
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java
jackrabbit/oak/trunk/oak-jcr/pom.xml
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/util/Iterators.java Thu Jun 28 09:19:50 2012
@@ -203,7 +203,7 @@ public final class Iterators {
* @return
*/
@Nonnull
- public static <T> Iterator<T> flatten(final Iterator<Iterator<? extends T>> iterators) {
+ public static <T> Iterator<T> flatten(final Iterator<? extends Iterator<? extends T>> iterators) {
return new Iterator<T>() {
private Iterator<? extends T> current;
Modified: jackrabbit/oak/trunk/oak-jcr/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/pom.xml?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-jcr/pom.xml Thu Jun 28 09:19:50 2012
@@ -86,7 +86,7 @@ org.apache.jackrabbit.test.api.query.XPa
org.apache.jackrabbit.test.api.query.qom.ColumnTest#testExpandColumnsForNodeType
org.apache.jackrabbit.test.api.query.qom.SelectorTest#testUnknownNodeType
org.apache.jackrabbit.test.api.util
-org.apache.jackrabbit.test.api.observation.EventIteratorTest#testGetPosition <!-- JCR-3361 -->
+org.apache.jackrabbit.test.api.observation.EventIteratorTest#testSkip
org.apache.jackrabbit.test.api.observation.EventTest#testGetType
org.apache.jackrabbit.test.api.observation.EventTest#testGetNodePath
org.apache.jackrabbit.test.api.observation.EventTest#testGetUserId
@@ -94,8 +94,8 @@ org.apache.jackrabbit.test.api.observati
org.apache.jackrabbit.test.api.observation.NodeAddedTest#testMultipleNodeAdded1
org.apache.jackrabbit.test.api.observation.NodeAddedTest#testMultipleNodeAdded2
org.apache.jackrabbit.test.api.observation.NodeAddedTest#testTransientNodeAddedRemoved
-org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testSingleNodeRemoved <!-- JCR-3361 -->
-org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testMultiNodesRemoved <!-- JCR-3361 -->
+org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testSingleNodeRemoved
+org.apache.jackrabbit.test.api.observation.NodeRemovedTest#testMultiNodesRemoved
org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveNode
org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveTree
org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveWithRemove
@@ -103,12 +103,9 @@ org.apache.jackrabbit.test.api.observati
org.apache.jackrabbit.test.api.observation.NodeReorderTest#testNodeReorderMove <!-- reorder not supported -->
org.apache.jackrabbit.test.api.observation.NodeReorderTest#testNodeReorderSameName <!-- reorder not supported -->
org.apache.jackrabbit.test.api.observation.NodeReorderTest#testNodeReorderSameNameWithRemove <!-- reorder not supported -->
-org.apache.jackrabbit.test.api.observation.PropertyAddedTest#testMultiPropertyAdded <!-- JCR-3361 -->
-org.apache.jackrabbit.test.api.observation.PropertyChangedTest#testMultiPropertyChanged <!-- JCR-3361 -->
org.apache.jackrabbit.test.api.observation.PropertyChangedTest#testSinglePropertyChangedWithAdded
-org.apache.jackrabbit.test.api.observation.PropertyRemovedTest#testMultiPropertyRemoved <!-- JCR-3361 -->
org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testPath
-org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testNodeType <!-- JCR-3361 -->
+org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testNodeType
org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testIsDeepFalseNodeAdded
org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testIsDeepFalsePropertyAdded
org.apache.jackrabbit.test.api.observation.AddEventListenerTest#testNoLocalTrue
Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Thu Jun 28 09:19:50 2012
@@ -16,8 +16,9 @@
*/
package org.apache.jackrabbit.oak.jcr.observation;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicReference;
@@ -65,27 +66,42 @@ class ChangeProcessor extends TimerTask
@Override
public void run() {
- changeExtractor.getChanges(new EventGeneratingNodeStateDiff());
+ EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff();
+ changeExtractor.getChanges(diff);
+ diff.sendEvents();
}
//------------------------------------------------------------< private >---
private class EventGeneratingNodeStateDiff implements NodeStateDiff {
+ public static final int PURGE_LIMIT = 8192;
+
private final String path;
- EventGeneratingNodeStateDiff(String path) {
+ private int childNodeCount;
+ private List<Iterator<Event>> events;
+
+ EventGeneratingNodeStateDiff(String path, List<Iterator<Event>> events) {
this.path = path;
+ this.events = events;
}
public EventGeneratingNodeStateDiff() {
- this("/");
+ this("/", new ArrayList<Iterator<Event>>(PURGE_LIMIT));
+ }
+
+ public void sendEvents() {
+ if (!events.isEmpty()) {
+ listener.onEvent(new EventIteratorAdapter(Iterators.flatten(events.iterator())));
+ events = new ArrayList<Iterator<Event>>(PURGE_LIMIT);
+ }
}
@Override
public void propertyAdded(PropertyState after) {
if (!stopped && filterRef.get().include(Event.PROPERTY_ADDED, path, after)) {
Event event = generatePropertyEvent(Event.PROPERTY_ADDED, path, after);
- listener.onEvent(new EventIteratorAdapter(Collections.singleton(event)));
+ events.add(Iterators.singleton(event));
}
}
@@ -93,7 +109,7 @@ class ChangeProcessor extends TimerTask
public void propertyChanged(PropertyState before, PropertyState after) {
if (!stopped && filterRef.get().include(Event.PROPERTY_CHANGED, path, before)) {
Event event = generatePropertyEvent(Event.PROPERTY_CHANGED, path, after);
- listener.onEvent(new EventIteratorAdapter(Collections.singleton(event)));
+ events.add(Iterators.singleton(event));
}
}
@@ -101,7 +117,7 @@ class ChangeProcessor extends TimerTask
public void propertyDeleted(PropertyState before) {
if (!stopped && filterRef.get().include(Event.PROPERTY_REMOVED, path, before)) {
Event event = generatePropertyEvent(Event.PROPERTY_REMOVED, path, before);
- listener.onEvent(new EventIteratorAdapter(Collections.singleton(event)));
+ events.add(Iterators.singleton(event));
}
}
@@ -109,7 +125,10 @@ class ChangeProcessor extends TimerTask
public void childNodeAdded(String name, NodeState after) {
if (!stopped && filterRef.get().include(Event.NODE_ADDED, path, after)) {
Iterator<Event> events = generateNodeEvents(Event.NODE_ADDED, path, name, after);
- listener.onEvent(new EventIteratorAdapter(events));
+ this.events.add(events);
+ if (++childNodeCount > PURGE_LIMIT) {
+ sendEvents();
+ }
}
}
@@ -117,15 +136,18 @@ class ChangeProcessor extends TimerTask
public void childNodeDeleted(String name, NodeState before) {
if (!stopped && filterRef.get().include(Event.NODE_REMOVED, path, before)) {
Iterator<Event> events = generateNodeEvents(Event.NODE_REMOVED, path, name, before);
- listener.onEvent(new EventIteratorAdapter(events));
+ this.events.add(events);
}
}
@Override
public void childNodeChanged(String name, NodeState before, NodeState after) {
if (!stopped && filterRef.get().includeChildren(path)) {
- changeExtractor.getChanges(before, after,
- new EventGeneratingNodeStateDiff(PathUtils.concat(path, name)));
+ EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(PathUtils.concat(path, name), events);
+ changeExtractor.getChanges(before, after, diff);
+ if (events.size() > PURGE_LIMIT) {
+ diff.sendEvents();
+ }
}
}
}
Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1354877&r1=1354876&r2=1354877&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java Thu Jun 28 09:19:50 2012
@@ -55,6 +55,7 @@ public class ObservationManagerImpl impl
boolean isDeep, String[] uuid, String[] nodeTypeName, boolean noLocal)
throws RepositoryException {
+ // TODO: support noLocal flag!?
ChangeProcessor processor = processors.get(listener);
if (processor == null) {
ChangeExtractor extractor = sessionDelegate.getChangeExtractor();