You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2022/03/21 09:25:25 UTC

[sling-org-apache-sling-event] branch master updated: SLING-11192 : Calculating metrics takes too long

This is an automated email from the ASF dual-hosted git repository.

cziegeler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git


The following commit(s) were added to refs/heads/master by this push:
     new 91fca04  SLING-11192 : Calculating metrics takes too long
91fca04 is described below

commit 91fca04d1bd428a57e1b16800536a3b7fdee8e20
Author: Carsten Ziegeler <cz...@apache.org>
AuthorDate: Mon Mar 21 10:25:18 2022 +0100

    SLING-11192 : Calculating metrics takes too long
---
 .../event/impl/jobs/stats/BaseStatisticsImpl.java  | 219 ++++++++++++++++++++
 .../event/impl/jobs/stats/StatisticsImpl.java      | 229 ++++-----------------
 .../event/impl/jobs/stats/StatisticsManager.java   |   8 +-
 .../event/impl/jobs/stats/TopicStatisticsImpl.java | 129 +-----------
 .../event/impl/jobs/stats/StatisticsImplTest.java  | 182 ++++++++++++++++
 .../impl/jobs/stats/TopicStatisticsImplTest.java   | 107 ++++++++++
 6 files changed, 552 insertions(+), 322 deletions(-)

diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/BaseStatisticsImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/BaseStatisticsImpl.java
new file mode 100644
index 0000000..802b726
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/BaseStatisticsImpl.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Base class for statistics implementations
+ */
+public abstract class BaseStatisticsImpl {
+
+    private final AtomicLong lastActivated = new AtomicLong(-1);
+
+    private final AtomicLong lastFinished = new AtomicLong(-1);
+
+    private final AtomicLong waitingTime = new AtomicLong();
+
+    private final AtomicLong processingTime = new AtomicLong();
+
+    private final AtomicLong waitingCount = new AtomicLong();
+
+    private final AtomicLong processingCount = new AtomicLong();
+
+    private final AtomicLong finishedJobs = new AtomicLong();
+
+    private final AtomicLong failedJobs = new AtomicLong();
+
+    private final AtomicLong cancelledJobs = new AtomicLong();
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfProcessedJobs()
+     */
+    public long getNumberOfProcessedJobs() {
+        return finishedJobs.get() + failedJobs.get() + cancelledJobs.get();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getAverageWaitingTime()
+     */
+    public long getAverageWaitingTime() {
+        final long time = this.waitingTime.get();
+        final long count = this.waitingCount.get();
+        if ( count > 1 ) {
+            return time / count;
+        }
+        return time;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getAverageProcessingTime()
+     */
+    public long getAverageProcessingTime() {
+        final long time = this.processingTime.get();
+        final long count = this.processingCount.get();
+        if ( count > 1 ) {
+            return time / count;
+        }
+        return time;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfFinishedJobs()
+     */
+    public long getNumberOfFinishedJobs() {
+        return finishedJobs.get();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfCancelledJobs()
+     */
+    public long getNumberOfCancelledJobs() {
+        return cancelledJobs.get();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfFailedJobs()
+     */
+    public long getNumberOfFailedJobs() {
+        return failedJobs.get();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getLastActivatedJobTime()
+     */
+    public long getLastActivatedJobTime() {
+        return lastActivated.get();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getLastFinishedJobTime()
+     */
+    public long getLastFinishedJobTime() {
+        return lastFinished.get();
+    }
+
+    /**
+     * Add a finished job
+     * @param jobTime The processing time for this job.
+     */
+    public synchronized void finishedJob(final long jobTime) {
+        this.lastFinished.set(System.currentTimeMillis());
+        this.processingTime.addAndGet(jobTime);
+        this.processingCount.incrementAndGet();    
+        this.finishedJobs.incrementAndGet();
+    }
+
+    /**
+     * Add a job from the queue to status active
+     * @param queueTime The time the job stayed in the queue.
+     */
+    public synchronized void addActive(final long queueTime) {
+        this.waitingCount.incrementAndGet();
+        this.waitingTime.addAndGet(queueTime);
+        this.lastActivated.set(System.currentTimeMillis());
+    }
+
+    /**
+     * Add a failed job.
+     */
+    public synchronized void failedJob() {
+        this.failedJobs.incrementAndGet();
+    }
+
+    /**
+     * Add a cancelled job.
+     */
+    public synchronized void cancelledJob() {
+        this.cancelledJobs.incrementAndGet();
+    }
+
+    /**
+     * Add another statistics information.
+     */
+    public synchronized void add(final BaseStatisticsImpl other) {
+        synchronized ( other ) {
+            if ( other.lastActivated.get() > this.lastActivated.get() ) {
+                this.lastActivated.set(other.lastActivated.get());
+            }
+            if ( other.lastFinished.get() > this.lastFinished.get() ) {
+                this.lastFinished.set(other.lastFinished.get());
+            }
+            this.waitingTime.addAndGet(other.waitingTime.get());
+            this.waitingCount.addAndGet(other.waitingCount.get());
+            this.processingTime.addAndGet(other.processingTime.get());
+            this.processingCount.addAndGet(other.processingCount.get());
+            this.finishedJobs.addAndGet(other.finishedJobs.get());
+            this.failedJobs.addAndGet(other.failedJobs.get());
+            this.cancelledJobs.addAndGet(other.cancelledJobs.get());
+        }
+    }
+
+    /**
+     * Create a new statistics object with exactly the same values.
+     */
+    public void copyFrom(final BaseStatisticsImpl other) {
+        final long localLastActivated;
+        final long localLastFinished;
+        final long localWaitingTime;
+        final long localProcessingTime;
+        final long localWaitingCount;
+        final long localProcessingCount;
+        final long localFinishedJobs;
+        final long localFailedJobs;
+        final long localCancelledJobs;
+        synchronized ( other ) {
+            localLastActivated = other.lastActivated.get();
+            localLastFinished = other.lastFinished.get();
+            localWaitingTime = other.waitingTime.get();
+            localProcessingTime = other.processingTime.get();
+            localWaitingCount = other.waitingCount.get();
+            localProcessingCount = other.processingCount.get();
+            localFinishedJobs = other.finishedJobs.get();
+            localFailedJobs = other.failedJobs.get();
+            localCancelledJobs = other.cancelledJobs.get();
+        }
+        synchronized ( this ) {
+            this.lastActivated.set(localLastActivated);
+            this.lastFinished.set(localLastFinished);
+            this.waitingTime.set(localWaitingTime);
+            this.processingTime.set(localProcessingTime);
+            this.waitingCount.set(localWaitingCount);
+            this.processingCount.set(localProcessingCount);
+            this.finishedJobs.set(localFinishedJobs);
+            this.failedJobs.set(localFailedJobs);
+            this.cancelledJobs.set(localCancelledJobs);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#reset()
+     */
+    public synchronized void reset() {
+        this.lastActivated.set(-1);
+        this.lastFinished.set(-1);
+        this.waitingTime.set(0);
+        this.processingTime.set(0);
+        this.waitingCount.set(0);
+        this.processingCount.set(0);
+        this.finishedJobs.set(0);
+        this.failedJobs.set(0);
+        this.cancelledJobs.set(0);
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java
index 00a278f..82c7a01 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java
@@ -18,143 +18,59 @@
  */
 package org.apache.sling.event.impl.jobs.stats;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.sling.event.jobs.Statistics;
 
 /**
  * Implementation of the statistics.
  */
-public class StatisticsImpl implements Statistics {
-
-    private volatile long startTime;
-
-    private volatile long activeJobs;
-
-    private volatile long queuedJobs;
-
-    private volatile long lastActivated = -1;
-
-    private volatile long lastFinished = -1;
-
-    private volatile long averageWaitingTime;
+public class StatisticsImpl extends BaseStatisticsImpl implements Statistics {
 
-    private volatile long averageProcessingTime;
+    private final AtomicLong startTime = new AtomicLong();
 
-    private volatile long waitingTime;
+    private final AtomicLong activeJobs = new AtomicLong();
 
-    private volatile long processingTime;
-
-    private volatile long waitingCount;
-
-    private volatile long processingCount;
-
-    private volatile long finishedJobs;
-
-    private volatile long failedJobs;
-
-    private volatile long cancelledJobs;
+    private final AtomicLong queuedJobs = new AtomicLong();
 
     public StatisticsImpl() {
         this(System.currentTimeMillis());
     }
 
     public StatisticsImpl(final long startTime) {
-        this.startTime = startTime;
+        this.startTime.set(startTime);
     }
 
     /**
      * @see org.apache.sling.event.jobs.Statistics#getStartTime()
      */
     @Override
-    public synchronized long getStartTime() {
-        return startTime;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getNumberOfProcessedJobs()
-     */
-    @Override
-    public synchronized long getNumberOfProcessedJobs() {
-        return getNumberOfCancelledJobs() + getNumberOfFailedJobs() + getNumberOfFinishedJobs();
+    public long getStartTime() {
+        return startTime.get();
     }
 
     /**
      * @see org.apache.sling.event.jobs.Statistics#getNumberOfActiveJobs()
      */
     @Override
-    public synchronized long getNumberOfActiveJobs() {
-        return activeJobs;
+    public long getNumberOfActiveJobs() {
+        return activeJobs.get();
     }
 
     /**
      * @see org.apache.sling.event.jobs.Statistics#getNumberOfQueuedJobs()
      */
     @Override
-    public synchronized long getNumberOfQueuedJobs() {
-        return queuedJobs;
+    public long getNumberOfQueuedJobs() {
+        return queuedJobs.get();
     }
 
     /**
      * @see org.apache.sling.event.jobs.Statistics#getNumberOfJobs()
      */
     @Override
-    public synchronized long getNumberOfJobs() {
-        return activeJobs + queuedJobs;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getAverageWaitingTime()
-     */
-    @Override
-    public synchronized long getAverageWaitingTime() {
-        return averageWaitingTime;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getAverageProcessingTime()
-     */
-    @Override
-    public synchronized long getAverageProcessingTime() {
-        return averageProcessingTime;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getNumberOfFinishedJobs()
-     */
-    @Override
-    public synchronized long getNumberOfFinishedJobs() {
-        return finishedJobs;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getNumberOfCancelledJobs()
-     */
-    @Override
-    public synchronized long getNumberOfCancelledJobs() {
-        return cancelledJobs;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getNumberOfFailedJobs()
-     */
-    @Override
-    public synchronized long getNumberOfFailedJobs() {
-        return failedJobs;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getLastActivatedJobTime()
-     */
-    @Override
-    public synchronized long getLastActivatedJobTime() {
-        return this.lastActivated;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Statistics#getLastFinishedJobTime()
-     */
-    @Override
-    public synchronized long getLastFinishedJobTime() {
-        return this.lastFinished;
+    public long getNumberOfJobs() {
+        return activeJobs.get() + queuedJobs.get();
     }
 
     /**
@@ -162,49 +78,45 @@ public class StatisticsImpl implements Statistics {
      * @param jobTime The processing time for this job.
      */
     public synchronized void finishedJob(final long jobTime) {
-        this.lastFinished = System.currentTimeMillis();
-        this.processingTime += jobTime;
-        this.processingCount++;
-        this.averageProcessingTime = this.processingTime / this.processingCount;
-        this.finishedJobs++;
-        this.activeJobs--;
+        super.finishedJob(jobTime);
+        this.activeJobs.decrementAndGet();
     }
 
     /**
      * Add a failed job.
      */
     public synchronized void failedJob() {
-        this.failedJobs++;
-        this.activeJobs--;
+        super.failedJob();
+        this.activeJobs.decrementAndGet();
     }
 
     /**
      * Add a cancelled job.
      */
     public synchronized void cancelledJob() {
-        this.cancelledJobs++;
-        this.activeJobs--;
+        super.cancelledJob();
+        this.activeJobs.decrementAndGet();
     }
 
     /**
      * New job in the queue
      */
-    public synchronized void incQueued() {
-        this.queuedJobs++;
+    public void incQueued() {
+        this.queuedJobs.incrementAndGet();
     }
 
     /**
      * Job not processed by us
      */
-    public synchronized void decQueued() {
-        this.queuedJobs--;
+    public void decQueued() {
+        this.queuedJobs.decrementAndGet();
     }
 
     /**
      * Clear all queued
      */
     public synchronized void clearQueued() {
-        this.queuedJobs = 0;
+        this.queuedJobs.set(0);
     }
 
     /**
@@ -212,12 +124,9 @@ public class StatisticsImpl implements Statistics {
      * @param queueTime The time the job stayed in the queue.
      */
     public synchronized void addActive(final long queueTime) {
-        this.queuedJobs--;
-        this.activeJobs++;
-        this.waitingCount++;
-        this.waitingTime += queueTime;
-        this.averageWaitingTime = this.waitingTime / this.waitingCount;
-        this.lastActivated = System.currentTimeMillis();
+        super.addActive(queueTime);
+        this.queuedJobs.decrementAndGet();
+        this.activeJobs.incrementAndGet();
     }
 
     /**
@@ -225,27 +134,9 @@ public class StatisticsImpl implements Statistics {
      */
     public synchronized void add(final StatisticsImpl other) {
         synchronized ( other ) {
-            if ( other.lastActivated > this.lastActivated ) {
-                this.lastActivated = other.lastActivated;
-            }
-            if ( other.lastFinished > this.lastFinished ) {
-                this.lastFinished = other.lastFinished;
-            }
-            this.queuedJobs += other.queuedJobs;
-            this.waitingTime += other.waitingTime;
-            this.waitingCount += other.waitingCount;
-            if ( this.waitingCount > 0 ) {
-                this.averageWaitingTime = this.waitingTime / this.waitingCount;
-            }
-            this.processingTime += other.processingTime;
-            this.processingCount += other.processingCount;
-            if ( this.processingCount > 0 ) {
-                this.averageProcessingTime = this.processingTime / this.processingCount;
-            }
-            this.finishedJobs += other.finishedJobs;
-            this.failedJobs += other.failedJobs;
-            this.cancelledJobs += other.cancelledJobs;
-            this.activeJobs += other.activeJobs;
+            super.add(other);
+            this.queuedJobs.addAndGet(other.queuedJobs.get());
+            this.activeJobs.addAndGet(other.activeJobs.get());
         }
     }
 
@@ -253,48 +144,16 @@ public class StatisticsImpl implements Statistics {
      * Create a new statistics object with exactly the same values.
      */
     public void copyFrom(final StatisticsImpl other) {
+        super.copyFrom(other);
         final long localQueuedJobs;
-        final long localLastActivated;
-        final long localLastFinished;
-        final long localAverageWaitingTime;
-        final long localAverageProcessingTime;
-        final long localWaitingTime;
-        final long localProcessingTime;
-        final long localWaitingCount;
-        final long localProcessingCount;
-        final long localFinishedJobs;
-        final long localFailedJobs;
-        final long localCancelledJobs;
         final long localActiveJobs;
         synchronized ( other ) {
-            localQueuedJobs = other.queuedJobs;
-            localLastActivated = other.lastActivated;
-            localLastFinished = other.lastFinished;
-            localAverageWaitingTime = other.averageWaitingTime;
-            localAverageProcessingTime = other.averageProcessingTime;
-            localWaitingTime = other.waitingTime;
-            localProcessingTime = other.processingTime;
-            localWaitingCount = other.waitingCount;
-            localProcessingCount = other.processingCount;
-            localFinishedJobs = other.finishedJobs;
-            localFailedJobs = other.failedJobs;
-            localCancelledJobs = other.cancelledJobs;
-            localActiveJobs = other.activeJobs;
+            localQueuedJobs = other.queuedJobs.get();
+            localActiveJobs = other.activeJobs.get();
         }
         synchronized ( this ) {
-            this.queuedJobs = localQueuedJobs;
-            this.lastActivated = localLastActivated;
-            this.lastFinished = localLastFinished;
-            this.averageWaitingTime = localAverageWaitingTime;
-            this.averageProcessingTime = localAverageProcessingTime;
-            this.waitingTime = localWaitingTime;
-            this.processingTime = localProcessingTime;
-            this.waitingCount = localWaitingCount;
-            this.processingCount = localProcessingCount;
-            this.finishedJobs = localFinishedJobs;
-            this.failedJobs = localFailedJobs;
-            this.cancelledJobs = localCancelledJobs;
-            this.activeJobs = localActiveJobs;
+            this.queuedJobs.set(localQueuedJobs);
+            this.activeJobs.set(localActiveJobs);
         }
     }
 
@@ -303,17 +162,7 @@ public class StatisticsImpl implements Statistics {
      */
     @Override
     public synchronized void reset() {
-        this.startTime = System.currentTimeMillis();
-        this.lastActivated = -1;
-        this.lastFinished = -1;
-        this.averageWaitingTime = 0;
-        this.averageProcessingTime = 0;
-        this.waitingTime = 0;
-        this.processingTime = 0;
-        this.waitingCount = 0;
-        this.processingCount = 0;
-        this.finishedJobs = 0;
-        this.failedJobs = 0;
-        this.cancelledJobs = 0;
+        this.startTime.set(System.currentTimeMillis());
+        super.reset();
     }
 }
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
index 8e037b3..cf6b0bd 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
@@ -141,21 +141,21 @@ public class StatisticsManager {
         }
 
         if ( state == InternalJobState.CANCELLED ) {
-            ts.addCancelled();
+            ts.cancelledJob();;
             this.globalStatistics.cancelledJob();
             if ( queueStats != null ) {
                 queueStats.cancelledJob();
             }
 
         } else if ( state == InternalJobState.FAILED ) {
-            ts.addFailed();
+            ts.failedJob();
             this.globalStatistics.failedJob();
             if ( queueStats != null ) {
                 queueStats.failedJob();
             }
 
         } else if ( state == InternalJobState.SUCCEEDED ) {
-            ts.addFinished(processingTime);
+            ts.finishedJob(processingTime);
             this.globalStatistics.finishedJob(processingTime);
             if ( queueStats != null ) {
                 queueStats.finishedJob(processingTime);
@@ -174,7 +174,7 @@ public class StatisticsManager {
             ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
         }
 
-        ts.addActivated(queueTime);
+        ts.addActive(queueTime);
         this.globalStatistics.addActive(queueTime);
         if ( queueStats != null ) {
             queueStats.addActive(queueTime);
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java
index 3b0a32e..8aed42f 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java
@@ -23,32 +23,10 @@ import org.apache.sling.event.jobs.TopicStatistics;
 /**
  * Implementation of the statistics.
  */
-public class TopicStatisticsImpl implements TopicStatistics {
+public class TopicStatisticsImpl extends BaseStatisticsImpl implements TopicStatistics {
 
     private final String topic;
 
-    private volatile long lastActivated = -1;
-
-    private volatile long lastFinished = -1;
-
-    private volatile long averageWaitingTime;
-
-    private volatile long averageProcessingTime;
-
-    private volatile long waitingTime;
-
-    private volatile long processingTime;
-
-    private volatile long waitingCount;
-
-    private volatile long processingCount;
-
-    private volatile long finishedJobs;
-
-    private volatile long failedJobs;
-
-    private volatile long cancelledJobs;
-
     /** Constructor. */
     public TopicStatisticsImpl(final String topic) {
         this.topic = topic;
@@ -61,109 +39,4 @@ public class TopicStatisticsImpl implements TopicStatistics {
     public String getTopic() {
         return this.topic;
     }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfProcessedJobs()
-     */
-    @Override
-    public synchronized long getNumberOfProcessedJobs() {
-        return getNumberOfCancelledJobs() + getNumberOfFailedJobs() + getNumberOfFinishedJobs();
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getAverageWaitingTime()
-     */
-    @Override
-    public synchronized long getAverageWaitingTime() {
-        return averageWaitingTime;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getAverageProcessingTime()
-     */
-    @Override
-    public synchronized long getAverageProcessingTime() {
-        return averageProcessingTime;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfFinishedJobs()
-     */
-    @Override
-    public synchronized long getNumberOfFinishedJobs() {
-        return finishedJobs;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfCancelledJobs()
-     */
-    @Override
-    public synchronized long getNumberOfCancelledJobs() {
-        return cancelledJobs;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfFailedJobs()
-     */
-    @Override
-    public synchronized long getNumberOfFailedJobs() {
-        return failedJobs;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getLastActivatedJobTime()
-     */
-    @Override
-    public synchronized long getLastActivatedJobTime() {
-        return this.lastActivated;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.TopicStatistics#getLastFinishedJobTime()
-     */
-    @Override
-    public synchronized long getLastFinishedJobTime() {
-        return this.lastFinished;
-    }
-
-    /**
-     * Add a finished job.
-     * @param jobTime The time of the job processing.
-     */
-    public synchronized void addFinished(final long jobTime) {
-        this.finishedJobs++;
-        this.lastFinished = System.currentTimeMillis();
-        if ( jobTime > 0 ) {
-            this.processingTime += jobTime;
-            this.processingCount++;
-            this.averageProcessingTime = this.processingTime / this.processingCount;
-        }
-    }
-
-    /**
-     * Add a started job.
-     * @param queueTime The time of the job in the queue.
-     */
-    public synchronized void addActivated(final long queueTime) {
-        this.lastActivated = System.currentTimeMillis();
-        if ( queueTime > 0 ) {
-            this.waitingTime += queueTime;
-            this.waitingCount++;
-            this.averageWaitingTime = this.waitingTime / this.waitingCount;
-        }
-    }
-
-    /**
-     * Add a failed job.
-     */
-    public synchronized void addFailed() {
-        this.failedJobs++;
-    }
-
-    /**
-     * Add a cancelled job.
-     */
-    public synchronized void addCancelled() {
-        this.cancelledJobs++;
-    }
 }
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsImplTest.java b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsImplTest.java
new file mode 100644
index 0000000..915ce28
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsImplTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class StatisticsImplTest {
+
+    @Test public void testInitNoStartTime() {
+        final long now = System.currentTimeMillis();
+        final StatisticsImpl s = new StatisticsImpl();
+        assertTrue(s.getStartTime() >= now);
+        assertTrue(s.getStartTime() <= System.currentTimeMillis());
+        assertEquals(0, s.getNumberOfFinishedJobs());
+        assertEquals(0, s.getNumberOfCancelledJobs());
+        assertEquals(0, s.getNumberOfFailedJobs());
+        assertEquals(0, s.getNumberOfProcessedJobs());
+        assertEquals(0, s.getNumberOfActiveJobs());
+        assertEquals(0, s.getNumberOfQueuedJobs());
+        assertEquals(0, s.getNumberOfJobs());
+        assertEquals(-1, s.getLastActivatedJobTime());
+        assertEquals(-1, s.getLastFinishedJobTime());
+        assertEquals(0, s.getAverageWaitingTime());
+        assertEquals(0, s.getAverageProcessingTime());
+    }
+
+    @Test public void testInitStartTime() {
+        final StatisticsImpl s = new StatisticsImpl(7000);
+        assertEquals(7000L, s.getStartTime());
+        assertEquals(0, s.getNumberOfFinishedJobs());
+        assertEquals(0, s.getNumberOfCancelledJobs());
+        assertEquals(0, s.getNumberOfFailedJobs());
+        assertEquals(0, s.getNumberOfProcessedJobs());
+        assertEquals(0, s.getNumberOfActiveJobs());
+        assertEquals(0, s.getNumberOfQueuedJobs());
+        assertEquals(0, s.getNumberOfJobs());
+        assertEquals(-1, s.getLastActivatedJobTime());
+        assertEquals(-1, s.getLastFinishedJobTime());
+        assertEquals(0, s.getAverageWaitingTime());
+        assertEquals(0, s.getAverageProcessingTime());
+    }
+
+    @Test public void reset() {
+        final StatisticsImpl s = new StatisticsImpl(7000);
+        final long now = System.currentTimeMillis();
+        s.reset();
+        assertTrue(s.getStartTime() >= now);
+        assertTrue(s.getStartTime() <= System.currentTimeMillis());
+        assertEquals(0, s.getNumberOfFinishedJobs());
+        assertEquals(0, s.getNumberOfCancelledJobs());
+        assertEquals(0, s.getNumberOfFailedJobs());
+        assertEquals(0, s.getNumberOfProcessedJobs());
+        assertEquals(0, s.getNumberOfActiveJobs());
+        assertEquals(0, s.getNumberOfQueuedJobs());
+        assertEquals(0, s.getNumberOfJobs());
+        assertEquals(-1, s.getLastActivatedJobTime());
+        assertEquals(-1, s.getLastFinishedJobTime());
+        assertEquals(0, s.getAverageWaitingTime());
+        assertEquals(0, s.getAverageProcessingTime());
+    }
+
+    @Test public void testJobFinished() {
+        final StatisticsImpl s = new StatisticsImpl();
+
+        final long now = System.currentTimeMillis();
+
+        s.incQueued();
+        s.incQueued();
+        assertEquals(2L, s.getNumberOfQueuedJobs());
+        assertEquals(0L, s.getNumberOfActiveJobs());
+        assertEquals(2L, s.getNumberOfJobs());
+
+        s.addActive(500);
+        s.addActive(700);
+        assertEquals(0L, s.getNumberOfQueuedJobs());
+        assertEquals(2L, s.getNumberOfActiveJobs());
+        assertEquals(2L, s.getNumberOfJobs());
+
+        s.finishedJob(300);
+        s.finishedJob(500);
+
+        assertEquals(0L, s.getNumberOfActiveJobs());
+        assertEquals(0L, s.getNumberOfQueuedJobs());
+        assertEquals(2L, s.getNumberOfFinishedJobs());
+        assertEquals(2L, s.getNumberOfProcessedJobs());
+        assertEquals(0L, s.getNumberOfJobs());
+
+        assertEquals(400L, s.getAverageProcessingTime());
+        assertEquals(600L, s.getAverageWaitingTime());
+        assertTrue(s.getLastFinishedJobTime() >= now);
+        assertTrue(s.getLastFinishedJobTime() <= System.currentTimeMillis());
+        assertTrue(s.getLastActivatedJobTime() >= now);
+        assertTrue(s.getLastActivatedJobTime() <= System.currentTimeMillis());
+    }
+
+    @Test public void testJobFailed() {
+        final StatisticsImpl s = new StatisticsImpl();
+
+        final long now = System.currentTimeMillis();
+
+        s.incQueued();
+        s.incQueued();
+        assertEquals(2L, s.getNumberOfQueuedJobs());
+        assertEquals(0L, s.getNumberOfActiveJobs());
+        assertEquals(2L, s.getNumberOfJobs());
+
+        s.addActive(500);
+        s.addActive(700);
+        assertEquals(0L, s.getNumberOfQueuedJobs());
+        assertEquals(2L, s.getNumberOfActiveJobs());
+        assertEquals(2L, s.getNumberOfJobs());
+
+        s.failedJob();
+        s.failedJob();
+
+        assertEquals(0L, s.getNumberOfActiveJobs());
+        assertEquals(0L, s.getNumberOfQueuedJobs());
+        assertEquals(0L, s.getNumberOfFinishedJobs());
+        assertEquals(2L, s.getNumberOfProcessedJobs());
+        assertEquals(2L, s.getNumberOfFailedJobs());
+        assertEquals(0L, s.getNumberOfJobs());
+
+        assertEquals(0, s.getAverageProcessingTime());
+        assertEquals(600L, s.getAverageWaitingTime());
+        assertEquals(-1L, s.getLastFinishedJobTime());
+        assertTrue(s.getLastActivatedJobTime() >= now);
+        assertTrue(s.getLastActivatedJobTime() <= System.currentTimeMillis());
+    }
+
+    @Test public void testJobCancelled() {
+        final StatisticsImpl s = new StatisticsImpl();
+
+        final long now = System.currentTimeMillis();
+
+        s.incQueued();
+        s.incQueued();
+        assertEquals(2L, s.getNumberOfQueuedJobs());
+        assertEquals(0L, s.getNumberOfActiveJobs());
+        assertEquals(2L, s.getNumberOfJobs());
+
+        s.addActive(500);
+        s.addActive(700);
+        assertEquals(0L, s.getNumberOfQueuedJobs());
+        assertEquals(2L, s.getNumberOfActiveJobs());
+        assertEquals(2L, s.getNumberOfJobs());
+
+        s.cancelledJob();
+        s.cancelledJob();
+
+        assertEquals(0L, s.getNumberOfActiveJobs());
+        assertEquals(0L, s.getNumberOfQueuedJobs());
+        assertEquals(0L, s.getNumberOfFinishedJobs());
+        assertEquals(2L, s.getNumberOfProcessedJobs());
+        assertEquals(2L, s.getNumberOfCancelledJobs());
+        assertEquals(0L, s.getNumberOfJobs());
+
+        assertEquals(0, s.getAverageProcessingTime());
+        assertEquals(600L, s.getAverageWaitingTime());
+        assertEquals(-1L, s.getLastFinishedJobTime());
+        assertTrue(s.getLastActivatedJobTime() >= now);
+        assertTrue(s.getLastActivatedJobTime() <= System.currentTimeMillis());
+    }
+}
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImplTest.java b/src/test/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImplTest.java
new file mode 100644
index 0000000..bc98f24
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImplTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.sling.event.jobs.TopicStatistics;
+import org.junit.Test;
+
+public class TopicStatisticsImplTest {
+
+    @Test public void testInit() {
+        final TopicStatistics s = new TopicStatisticsImpl("topic");
+        assertEquals("topic", s.getTopic());
+        assertEquals(0, s.getNumberOfFinishedJobs());
+        assertEquals(0, s.getNumberOfCancelledJobs());
+        assertEquals(0, s.getNumberOfFailedJobs());
+        assertEquals(0, s.getNumberOfProcessedJobs());
+        assertEquals(-1, s.getLastActivatedJobTime());
+        assertEquals(-1, s.getLastFinishedJobTime());
+        assertEquals(0, s.getAverageWaitingTime());
+        assertEquals(0, s.getAverageProcessingTime());
+    }
+
+    @Test public void testJobFinished() {
+        final TopicStatisticsImpl s = new TopicStatisticsImpl("topic");
+
+        final long now = System.currentTimeMillis();
+
+        s.addActive(500);
+        s.addActive(700);
+
+        s.finishedJob(300);
+        s.finishedJob(500);
+
+        assertEquals(2L, s.getNumberOfFinishedJobs());
+        assertEquals(2L, s.getNumberOfProcessedJobs());
+
+        assertEquals(400L, s.getAverageProcessingTime());
+        assertEquals(600L, s.getAverageWaitingTime());
+        assertTrue(s.getLastFinishedJobTime() >= now);
+        assertTrue(s.getLastFinishedJobTime() <= System.currentTimeMillis());
+        assertTrue(s.getLastActivatedJobTime() >= now);
+        assertTrue(s.getLastActivatedJobTime() <= System.currentTimeMillis());
+    }
+
+    @Test public void testJobFailed() {
+        final TopicStatisticsImpl s = new TopicStatisticsImpl("topic");
+
+        final long now = System.currentTimeMillis();
+
+        s.addActive(500);
+        s.addActive(700);
+
+        s.failedJob();
+        s.failedJob();
+
+        assertEquals(0L, s.getNumberOfFinishedJobs());
+        assertEquals(2L, s.getNumberOfProcessedJobs());
+        assertEquals(2L, s.getNumberOfFailedJobs());
+
+        assertEquals(0, s.getAverageProcessingTime());
+        assertEquals(600L, s.getAverageWaitingTime());
+        assertEquals(-1L, s.getLastFinishedJobTime());
+        assertTrue(s.getLastActivatedJobTime() >= now);
+        assertTrue(s.getLastActivatedJobTime() <= System.currentTimeMillis());
+    }
+
+    @Test public void testJobCancelled() {
+        final TopicStatisticsImpl s = new TopicStatisticsImpl("topic");
+
+        final long now = System.currentTimeMillis();
+
+        s.addActive(500);
+        s.addActive(700);
+
+        s.cancelledJob();
+        s.cancelledJob();
+
+        assertEquals(0L, s.getNumberOfFinishedJobs());
+        assertEquals(2L, s.getNumberOfProcessedJobs());
+        assertEquals(2L, s.getNumberOfCancelledJobs());
+
+        assertEquals(0, s.getAverageProcessingTime());
+        assertEquals(600L, s.getAverageWaitingTime());
+        assertEquals(-1L, s.getLastFinishedJobTime());
+        assertTrue(s.getLastActivatedJobTime() >= now);
+        assertTrue(s.getLastActivatedJobTime() <= System.currentTimeMillis());
+    }
+}