You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/07/17 12:20:59 UTC

svn commit: r1611313 - /jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Author: mduerig
Date: Thu Jul 17 10:20:58 2014
New Revision: 1611313

URL: http://svn.apache.org/r1611313
Log:
OAK-1975: Wrong values reported for OBSERVATION_EVENT_DURATION
Delegate updating of the relevant counters to a dedicated class

Modified:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1611313&r1=1611312&r2=1611313&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Thu Jul 17 10:20:58 2014
@@ -286,8 +286,9 @@ class ChangeProcessor implements Observe
                 Iterator<Event> events = concat(eventQueues.iterator());
                 if (events.hasNext() && runningMonitor.enterIf(running)) {
                     try {
-                        eventListener.onEvent(
-                                new EventIteratorAdapter(statisticProvider(events)));
+                        CountingIterator<Event> countingEvents = new CountingIterator<Event>(events);
+                        eventListener.onEvent(new EventIteratorAdapter(countingEvents));
+                        countingEvents.updateCounters(eventCount, eventDuration);
                     } finally {
                         runningMonitor.leave();
                     }
@@ -299,24 +300,54 @@ class ChangeProcessor implements Observe
         previousRoot = root;
     }
 
-    private <T> Iterator<T> statisticProvider(final Iterator<T> events) {
-        return new ForwardingIterator<T>() {
-            @Override
-            protected Iterator<T> delegate() {
-                return events;
+    private static class CountingIterator<T> extends ForwardingIterator<T> {
+        private final long t0 = System.nanoTime();
+        private final Iterator<T> events;
+        private long eventCount;
+        private long sysTime;
+
+        public CountingIterator(Iterator<T> events) {
+            this.events = events;
+        }
+
+        public void updateCounters(AtomicLong eventCount, AtomicLong eventDuration) {
+            checkState(this.eventCount >= 0);
+            eventCount.addAndGet(this.eventCount);
+            eventDuration.addAndGet(System.nanoTime() - t0 - sysTime);
+            this.eventCount = -1;
+        }
+
+        @Override
+        protected Iterator<T> delegate() {
+            return events;
+        }
+
+        @Override
+        public T next() {
+            if (eventCount == -1) {
+                LOG.warn("Access to EventIterator outside the onEvent callback detected. This will " +
+                    "cause observation related values in RepositoryStatistics to become unreliable.");
+                eventCount = -2;
             }
 
-            @Override
-            public T next() {
-                long t0 = System.nanoTime();
-                try {
-                    return super.next();
-                } finally {
-                    eventCount.incrementAndGet();
-                    eventDuration.addAndGet(System.nanoTime() - t0);
-                }
+            long t0 = System.nanoTime();
+            try {
+                return super.next();
+            } finally {
+                eventCount++;
+                sysTime += System.nanoTime() - t0;
             }
-        };
+        }
+
+        @Override
+        public boolean hasNext() {
+            long t0 = System.nanoTime();
+            try {
+                return super.hasNext();
+            } finally {
+                sysTime += System.nanoTime() - t0;
+            }
+        }
     }
 
     private static class RunningGuard extends Guard {