You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/01/29 06:35:53 UTC
svn commit: r1562361 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
Author: jukka
Date: Wed Jan 29 05:35:53 2014
New Revision: 1562361
URL: http://svn.apache.org/r1562361
Log:
OAK-1332: Large number of changes to the same node can fill observation queue
Improved deferral of node change processing when the diff queue limit is reached
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java?rev=1562361&r1=1562360&r2=1562361&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java Wed Jan 29 05:35:53 2014
@@ -225,7 +225,9 @@ public class EventGenerator {
@Override
public boolean childNodeAdded(String name, NodeState after) {
- if (beforeEvent()) {
+ if (fullQueue()) {
+ return false;
+ } else if (beforeEvent()) {
PropertyState sourceProperty = after.getProperty(SOURCE_PATH);
if (sourceProperty != null) {
String sourcePath = sourceProperty.getValue(STRING);
@@ -233,7 +235,8 @@ public class EventGenerator {
}
handler.nodeAdded(name, after);
- return addChildDiff(name, MISSING_NODE, after);
+ addChildDiff(name, MISSING_NODE, after);
+ return afterEvent();
} else {
return true;
}
@@ -242,8 +245,11 @@ public class EventGenerator {
@Override
public boolean childNodeChanged(
String name, NodeState before, NodeState after) {
- if (beforeEvent()) {
- return addChildDiff(name, before, after);
+ if (fullQueue()) {
+ return false;
+ } else if (beforeEvent()) {
+ addChildDiff(name, before, after);
+ return afterEvent();
} else {
return true;
}
@@ -251,9 +257,12 @@ public class EventGenerator {
@Override
public boolean childNodeDeleted(String name, NodeState before) {
- if (beforeEvent()) {
+ if (fullQueue()) {
+ return false;
+ } else if (beforeEvent()) {
handler.nodeDeleted(name, before);
- return addChildDiff(name, before, MISSING_NODE);
+ addChildDiff(name, before, MISSING_NODE);
+ return afterEvent();
} else {
return true;
}
@@ -265,25 +274,12 @@ public class EventGenerator {
* Schedules a continuation for processing changes within the given
* child node, if changes within that subtree should be processed.
*/
- private boolean addChildDiff(
+ private void addChildDiff(
String name, NodeState before, NodeState after) {
ChangeHandler h = handler.getChildHandler(name, before, after);
if (h != null) {
continuations.add(new Continuation(h, before, after, 0));
}
-
- if (continuations.size() > MAX_QUEUED_CONTINUATIONS) {
- // Postpone further processing of the current continuation.
- // Even though this increases the size of the queue beyond
- // the limit, doing so ultimately forces property-only
- // diffs to the beginning of the queue, and thus helps
- // automatically clean up the backlog.
- continuations.add(new Continuation(
- handler, this.before, this.after, counter));
- return false;
- } else {
- return afterEvent();
- }
}
/**
@@ -295,6 +291,25 @@ public class EventGenerator {
}
/**
+ * Checks whether the diff queue has reached the maximum size limit,
+ * and postpones further processing of the current diff to later.
+ * Even though this postponement increases the size of the queue
+ * beyond the limit, doing so ultimately forces property-only
+ * diffs to the beginning of the queue, and thus helps to
+ * automatically clean up the backlog.
+ */
+ private boolean fullQueue() {
+ if (counter > skip // must have processed at least one event
+ && continuations.size() >= MAX_QUEUED_CONTINUATIONS) {
+ continuations.add(new Continuation(
+ handler, this.before, this.after, counter));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
* Checks whether enough events have already been processed in this
* continuation. If that is the case, we postpone further processing
* to a new continuation that will first skip all the initial events