You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/14 10:45:12 UTC

svn commit: r1022421 - in /sling/branches/eventing-3.0/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/console/ main/java/org/apache/sling/event/impl/jobs/jcr/ main/java/org/apache/sling/event/impl/jobs/queue...

Author: cziegeler
Date: Thu Oct 14 08:45:12 2010
New Revision: 1022421

URL: http://svn.apache.org/viewvc?rev=1022421&view=rev
Log:
Add statistics per topic

Added:
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java   (with props)
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java   (with props)
Modified:
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
    sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/StatisticsImplTest.java

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Thu Oct 14 08:45:12 2010
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -55,7 +56,9 @@ import org.apache.sling.event.jobs.JobUt
 import org.apache.sling.event.jobs.JobsIterator;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.Statistics;
+import org.apache.sling.event.jobs.TopicStatistics;
 import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +75,8 @@ import org.slf4j.LoggerFactory;
         metatype=true,immediate=true)
 @Services({
     @Service(value=Runnable.class),
-    @Service(value=JobManager.class)
+    @Service(value=JobManager.class),
+    @Service(value=EventHandler.class)
 })
 @Properties({
     @Property(name="scheduler.period", longValue=300,
@@ -89,11 +93,13 @@ import org.slf4j.LoggerFactory;
     @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
             longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
     @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
-            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL)
+            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
+    @Property(name="event.topics",propertyPrivate=true,
+            value={"org/apache/sling/event/notification/job/*"})
 })
 public class DefaultJobManager
     extends StatisticsImpl
-    implements Runnable, JobManager {
+    implements Runnable, JobManager, EventHandler {
 
     /** Default logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -128,6 +134,9 @@ public class DefaultJobManager
     /** All existing events by topic. */
     private final Map<String, List<JobEvent>> allEventsByTopic = new HashMap<String, List<JobEvent>>();
 
+    /** Statistics per topic. */
+    private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<String, TopicStatistics>();
+
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -627,6 +636,42 @@ public class DefaultJobManager
         for(final AbstractJobQueue jq : this.queues.values() ) {
             jq.reset();
         }
+        this.topicStatistics.clear();
         this.lastUpdatedStatistics = 0;
     }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#getTopicStatistics()
+     */
+    public Iterable<TopicStatistics> getTopicStatistics() {
+        return topicStatistics.values();
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    public void handleEvent(final Event event) {
+        final Event job = (Event)event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+        if ( job != null ) {
+            final String topic = (String)job.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+            if ( topic != null ) { // this is just a sanity check
+                TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+                if ( ts == null ) {
+                    this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
+                    ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+                }
+                if ( event.getTopic().equals(JobUtil.TOPIC_JOB_CANCELLED) ) {
+                    ts.addCancelled();
+                } else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_FAILED) ) {
+                    ts.addFailed();
+                } else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_FINISHED) ) {
+                    final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
+                    ts.addFinished(time == null ? -1 : time);
+                } else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_STARTED) ) {
+                    final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
+                    ts.addActivated(time == null ? -1 : time);
+                }
+            }
+        }
+    }
 }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java Thu Oct 14 08:45:12 2010
@@ -117,10 +117,16 @@ public class StatisticsImpl implements S
         return finishedJobs;
     }
 
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfCancelledJobs()
+     */
     public synchronized long getNumberOfCancelledJobs() {
         return cancelledJobs;
     }
 
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfFailedJobs()
+     */
     public synchronized long getNumberOfFailedJobs() {
         return failedJobs;
     }
@@ -141,22 +147,29 @@ public class StatisticsImpl implements S
 
     /**
      * Add a finished job
+     * @param jobTime The processing time for this job.
      */
-    public synchronized void finishedJob(final long time) {
+    public synchronized void finishedJob(final long jobTime) {
         this.lastFinished = System.currentTimeMillis();
-        this.processingTime += time;
+        this.processingTime += jobTime;
         this.processingCount++;
         this.averageProcessingTime = this.processingTime / this.processingCount;
         this.finishedJobs++;
         this.activeJobs--;
     }
 
+    /**
+     * Add a failed job.
+     */
     public synchronized void failedJob() {
         this.failedJobs++;
         this.activeJobs--;
         this.queuedJobs++;
     }
 
+    /**
+     * Add a cancelled job.
+     */
     public synchronized void cancelledJob() {
         this.cancelledJobs++;
         this.activeJobs--;
@@ -185,13 +198,13 @@ public class StatisticsImpl implements S
 
     /**
      * Add a job from the queue to status active
-     * @param time The time the job stayed in the queue.
+     * @param queueTime The time the job stayed in the queue.
      */
-    public synchronized void addActive(final long time) {
+    public synchronized void addActive(final long queueTime) {
         this.queuedJobs--;
         this.activeJobs++;
         this.waitingCount++;
-        this.waitingTime += time;
+        this.waitingTime += queueTime;
         this.averageWaitingTime = this.waitingTime / this.waitingCount;
         this.lastActivated = System.currentTimeMillis();
     }

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java?rev=1022421&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java Thu Oct 14 08:45:12 2010
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.apache.sling.event.jobs.TopicStatistics;
+
+/**
+ * Implementation of the statistics.
+ */
+public class TopicStatisticsImpl implements TopicStatistics {
+
+    private final String topic;
+
+    private volatile long lastActivated = -1;
+
+    private volatile long lastFinished = -1;
+
+    private volatile long averageWaitingTime;
+
+    private volatile long averageProcessingTime;
+
+    private volatile long waitingTime;
+
+    private volatile long processingTime;
+
+    private volatile long waitingCount;
+
+    private volatile long processingCount;
+
+    private volatile long finishedJobs;
+
+    private volatile long failedJobs;
+
+    private volatile long cancelledJobs;
+
+    /** Constructor. */
+    public TopicStatisticsImpl(final String topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getTopic()
+     */
+    public String getTopic() {
+        return this.topic;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfProcessedJobs()
+     */
+    public synchronized long getNumberOfProcessedJobs() {
+        return getNumberOfCancelledJobs() + getNumberOfFailedJobs() + getNumberOfFinishedJobs();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getAverageWaitingTime()
+     */
+    public synchronized long getAverageWaitingTime() {
+        return averageWaitingTime;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getAverageProcessingTime()
+     */
+    public synchronized long getAverageProcessingTime() {
+        return averageProcessingTime;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfFinishedJobs()
+     */
+    public synchronized long getNumberOfFinishedJobs() {
+        return finishedJobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfCancelledJobs()
+     */
+    public synchronized long getNumberOfCancelledJobs() {
+        return cancelledJobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getNumberOfFailedJobs()
+     */
+    public synchronized long getNumberOfFailedJobs() {
+        return failedJobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getLastActivatedJobTime()
+     */
+    public synchronized long getLastActivatedJobTime() {
+        return this.lastActivated;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.TopicStatistics#getLastFinishedJobTime()
+     */
+    public synchronized long getLastFinishedJobTime() {
+        return this.lastFinished;
+    }
+
+    /**
+     * Add a finished job.
+     * @param jobTime The time of the job processing.
+     */
+    public synchronized void addFinished(final long jobTime) {
+        this.finishedJobs++;
+        this.lastFinished = System.currentTimeMillis();
+        if ( jobTime != -1 ) {
+            this.processingTime += jobTime;
+            this.processingCount++;
+            this.averageProcessingTime = this.processingTime / this.processingCount;
+        }
+    }
+
+    /**
+     * Add a started job.
+     * @param queueTime The time of the job in the queue.
+     */
+    public synchronized void addActivated(final long queueTime) {
+        this.lastActivated = System.currentTimeMillis();
+        if ( queueTime != -1 ) {
+            this.waitingTime += queueTime;
+            this.waitingCount++;
+            this.averageWaitingTime = this.waitingTime / this.waitingCount;
+        }
+    }
+
+    /**
+     * Add a failed job.
+     */
+    public synchronized void addFailed() {
+        this.failedJobs++;
+    }
+
+    /**
+     * Add a cancelled job.
+     */
+    public synchronized void addCancelled() {
+        this.cancelledJobs++;
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Thu Oct 14 08:45:12 2010
@@ -150,16 +150,23 @@ public class Utility {
         return sb.toString();
     }
 
+    /** Event property containing the time for job start and job finished events. */
+    public static final String PROPERTY_TIME = "time";
+
     /**
      * Helper method for sending the notification events.
      */
     public static void sendNotification(final EnvironmentComponent environment,
             final String topic,
-            final Event job) {
+            final Event job,
+            final Long time) {
         final EventAdmin localEA = environment.getEventAdmin();
         final Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(JobUtil.PROPERTY_NOTIFICATION_JOB, job);
         props.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
+        if ( time != null ) {
+            props.put(PROPERTY_TIME, time);
+        }
         localEA.postEvent(new Event(topic, props));
     }
 }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Thu Oct 14 08:45:12 2010
@@ -42,6 +42,7 @@ import org.apache.sling.event.jobs.JobMa
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.Statistics;
+import org.apache.sling.event.jobs.TopicStatistics;
 
 /**
  * This is a webconsole plugin displaying the active queues, some statistics
@@ -185,6 +186,22 @@ public class WebConsolePlugin extends Ht
             pw.println("<br/>");
         }
 
+        for(final TopicStatistics ts : this.jobManager.getTopicStatistics()) {
+            pw.println("<table class='nicetable'><tbody>");
+            pw.printf("<tr><th colspan='2'>Topic Statistics: %s</th></tr>", escape(ts.getTopic()));
+
+            pw.printf("<tr><td>Last Activated</td><td>%s</td></tr>", formatDate(ts.getLastActivatedJobTime()));
+            pw.printf("<tr><td>Last Finished</td><td>%s</td></tr>", formatDate(ts.getLastFinishedJobTime()));
+            pw.printf("<tr><td>Finished Jobs</td><td>%s</td></tr>", ts.getNumberOfFinishedJobs());
+            pw.printf("<tr><td>Failed Jobs</td><td>%s</td></tr>", ts.getNumberOfFailedJobs());
+            pw.printf("<tr><td>Cancelled Jobs</td><td>%s</td></tr>", ts.getNumberOfCancelledJobs());
+            pw.printf("<tr><td>Processed Jobs</td><td>%s</td></tr>", ts.getNumberOfProcessedJobs());
+            pw.printf("<tr><td>Average Processing Time</td><td>%s</td></tr>", formatTime(ts.getAverageProcessingTime()));
+            pw.printf("<tr><td>Average Waiting Time</td><td>%s</td></tr>", formatTime(ts.getAverageWaitingTime()));
+            pw.println("</tbody></table>");
+            pw.println("<br/>");
+        }
+
         pw.println("<p class='statline'>Apache Sling Eventing - Job Queue Configurations</p>");
         this.printQueueConfiguration(req, pw, ((DefaultJobManager)this.jobManager).getMainQueueConfiguration());
         final InternalQueueConfiguration[] configs = this.queueConfigManager.getConfigurations();
@@ -348,6 +365,19 @@ public class WebConsolePlugin extends Ht
             pw.println();
         }
 
+        for(final TopicStatistics ts : this.jobManager.getTopicStatistics()) {
+            pw.printf("Topic Statistics - %s%n", ts.getTopic());
+            pw.printf("Last Activated : %s%n", formatDate(ts.getLastActivatedJobTime()));
+            pw.printf("Last Finished : %s%n", formatDate(ts.getLastFinishedJobTime()));
+            pw.printf("Finished Jobs : %s%n", ts.getNumberOfFinishedJobs());
+            pw.printf("Failed Jobs : %s%n", ts.getNumberOfFailedJobs());
+            pw.printf("Cancelled Jobs : %s%n", ts.getNumberOfCancelledJobs());
+            pw.printf("Processed Jobs : %s%n", ts.getNumberOfProcessedJobs());
+            pw.printf("Average Processing Time : %s%n", formatTime(ts.getAverageProcessingTime()));
+            pw.printf("Average Waiting Time : %s%n", formatTime(ts.getAverageWaitingTime()));
+            pw.println();
+        }
+
         pw.println("Apache Sling Eventing - Job Queue Configurations");
         pw.println("------------------------------------------------");
         this.printQueueConfiguration(pw, ((DefaultJobManager)this.jobManager).getMainQueueConfiguration());

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Thu Oct 14 08:45:12 2010
@@ -979,7 +979,7 @@ public class PersistenceHandler implemen
                         // try to load job to send notification
                         try {
                             final Event job = this.forceReadEvent(eventNode);
-                            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, job);
+                            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, job, null);
                         } catch (RepositoryException ignore) {
                             this.ignoreException(ignore);
                         }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 14 08:45:12 2010
@@ -229,8 +229,9 @@ public abstract class AbstractJobQueue
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Received ack for job {}", EventUtil.toString(job));
             }
-            this.addActive(ack.started - ack.queued);
-            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_STARTED, job);
+            final long queueTime = ack.started - ack.queued;
+            this.addActive(queueTime);
+            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_STARTED, job, queueTime);
             synchronized ( this.processsingJobsLists ) {
                 this.processsingJobsLists.put(location, ack);
             }
@@ -267,20 +268,21 @@ public abstract class AbstractJobQueue
                 }
                 this.failedJob();
                 jobEvent.queued = System.currentTimeMillis();
-                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FAILED, jobEvent.event);
+                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FAILED, jobEvent.event, null);
             } else {
                 if ( this.logger.isDebugEnabled() ) {
                     this.logger.debug("Cancelled job {}", EventUtil.toString(jobEvent.event));
                 }
                 this.cancelledJob();
-                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.event);
+                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.event, null);
             }
         } else {
             if ( this.logger.isDebugEnabled() ) {
                 this.logger.debug("Finished job {}", EventUtil.toString(jobEvent.event));
             }
-            this.finishedJob(System.currentTimeMillis() - jobEvent.started);
-            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FINISHED, jobEvent.event);
+            final long processingTime = System.currentTimeMillis() - jobEvent.started;
+            this.finishedJob(processingTime);
+            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FINISHED, jobEvent.event, processingTime);
         }
 
         return reschedule;

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java Thu Oct 14 08:45:12 2010
@@ -36,6 +36,11 @@ public interface JobManager {
     Statistics getStatistics();
 
     /**
+     * Return statistics information about job topics.
+     */
+    Iterable<TopicStatistics> getTopicStatistics();
+
+    /**
      * Return a queue with a specific name (if running)
      * @param name The queue name
      * @return The queue or <code>null</code>

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java Thu Oct 14 08:45:12 2010
@@ -140,7 +140,7 @@ public abstract class JobUtil {
      */
     public static final String TOPIC_JOB_CANCELLED = "org/apache/sling/event/notification/job/CANCELLED";
 
-    /** Property containing the job event. */
+    /** Property containing the job event. The value is of type org.osgi.service.event.Event. */
     public static final String PROPERTY_NOTIFICATION_JOB = "event.notification.job";
 
     /**

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java?rev=1022421&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java Thu Oct 14 08:45:12 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+/**
+ * Statistic information about a topic.
+ * This information is not preserved between restarts of the service.
+ * Once a service is restarted, the counters start at zero!
+ * @since 3.0
+ */
+public interface TopicStatistics {
+
+    /**
+     * The topic this statistics is about.
+     */
+    String getTopic();
+
+    /**
+     * Number of successfully finished jobs.
+     */
+    long getNumberOfFinishedJobs();
+
+    /**
+     * Number of permanently failing or cancelled jobs.
+     */
+    long getNumberOfCancelledJobs();
+
+    /**
+     * Number of failing jobs.
+     */
+    long getNumberOfFailedJobs();
+
+    /**
+     * Number of already processed jobs. This adds
+     * {@link #getNumberOfFinishedJobs()}, {@link #getNumberOfCancelledJobs()}
+     * and {@link #getNumberOfFailedJobs()}
+     */
+    long getNumberOfProcessedJobs();
+
+    /**
+     * The time a job has been started last.
+     */
+    long getLastActivatedJobTime();
+
+    /**
+     * The time a job has been finished/failed/cancelled last.
+     */
+    long getLastFinishedJobTime();
+
+    /**
+     * The average waiting time of a job in the queue.
+     */
+    long getAverageWaitingTime();
+
+    /**
+     * The average processing time of a job - this only counts finished jobs.
+     */
+    long getAverageProcessingTime();
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/StatisticsImplTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/StatisticsImplTest.java?rev=1022421&r1=1022420&r2=1022421&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/StatisticsImplTest.java (original)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/StatisticsImplTest.java Thu Oct 14 08:45:12 2010
@@ -202,7 +202,7 @@ public class StatisticsImplTest {
         assertEquals(0, copy.getNumberOfJobs());
         assertEquals(5, copy.getNumberOfProcessedJobs());
         assertEquals(0, copy.getNumberOfQueuedJobs());
-        assertTrue(copy.getLastActivatedJobTime() >= now);
+        assertTrue(copy.getLastActivatedJobTime() <= now);
         assertTrue(copy.getLastFinishedJobTime() <= now);
 
         now = System.currentTimeMillis();