You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 13:03:05 UTC

svn commit: r1632272 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/stats/ main/java/org/apache/sling/event/impl/jobs/topics/ test/java/org/apache/sling/event/it/

Author: cziegeler
Date: Thu Oct 16 11:03:04 2014
New Revision: 1632272

URL: http://svn.apache.org/r1632272
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix round robin queue and implement statistics reset (WiP)

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 11:03:04 2014
@@ -269,7 +269,7 @@ public class JobManagerImpl
      */
     @Override
     public synchronized Statistics getStatistics() {
-        return this.statisticsManager.getOverallStatistics();
+        return this.statisticsManager.getGlobalStatistics();
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Thu Oct 16 11:03:04 2014
@@ -28,7 +28,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.jobs.Job;
@@ -38,9 +37,12 @@ import org.apache.sling.event.jobs.Topic
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventConstants;
 import org.osgi.service.event.EventHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+/**
+ * The statistics manager keeps track of all statistics related tasks.
+ * The statistics are updated through OSGi event by using the special
+ * job notification events.
+ */
 @Component(immediate=true)
 @Service(value={EventHandler.class, StatisticsManager.class})
 @Properties({
@@ -54,17 +56,27 @@ import org.slf4j.LoggerFactory;
 })
 public class StatisticsManager implements EventHandler {
 
-    /** Logger. */
-    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
-
+    /** The job manager configuration. */
     @Reference
     private JobManagerConfiguration configuration;
 
+    /** The queue configuration manager. */
     @Reference
     private QueueConfigurationManager queueConfigurationManager;
 
-    /** Current statistics. */
-    private final StatisticsImpl baseStatistics = new StatisticsImpl();
+    /** Global statistics. */
+    private final StatisticsImpl globalStatistics = new StatisticsImpl() {
+
+        @Override
+        public synchronized void reset() {
+            super.reset();
+            topicStatistics.clear();
+            for(final Statistics s : queueStatistics.values()) {
+                s.reset();
+            }
+        }
+
+    };
 
     /** Statistics per topic. */
     private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<String, TopicStatistics>();
@@ -72,14 +84,27 @@ public class StatisticsManager implement
     /** Statistics per queue. */
     private final ConcurrentMap<String, Statistics> queueStatistics = new ConcurrentHashMap<String, Statistics>();
 
-    public Statistics getOverallStatistics() {
-        return this.baseStatistics;
+    /**
+     * Get the global statistics.
+     * @return The global statistics.
+     */
+    public Statistics getGlobalStatistics() {
+        return this.globalStatistics;
     }
 
+    /**
+     * Get all topic statistics.
+     * @return The map of topic statistics by topic.
+     */
     public Map<String, TopicStatistics> getTopicStatistics() {
         return topicStatistics;
     }
 
+    /**
+     * Get a single queue statistics.
+     * @param queueName The queue name.
+     * @return The statistics for that queue.
+     */
     public Statistics getQueueStatistics(final String queueName) {
         Statistics queueStats = queueStatistics.get(queueName);
         if ( queueStats == null ) {
@@ -88,6 +113,11 @@ public class StatisticsManager implement
         return queueStats;
     }
 
+    /**
+     * Internal method to get the statistics of a queue.
+     * @param queueName The queue name.
+     * @return The statistics or {@code null} if queue name is {@code null}.
+     */
     private StatisticsImpl getStatisticsForQueue(final String queueName) {
         if ( queueName == null ) {
             return null;
@@ -100,6 +130,9 @@ public class StatisticsManager implement
         return queueStats;
     }
 
+    /**
+     * Handle all job notification events and update the statistics.
+     */
     @Override
     public void handleEvent(final Event event) {
         final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
@@ -114,19 +147,19 @@ public class StatisticsManager implement
             }
 
             if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_ADDED) ) {
-                this.baseStatistics.incQueued();
+                this.globalStatistics.incQueued();
                 queueStats.incQueued();
 
             } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
                 ts.addCancelled();
-                this.baseStatistics.cancelledJob();
+                this.globalStatistics.cancelledJob();
                 if ( queueStats != null ) {
                     queueStats.cancelledJob();
                 }
 
             } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
                 ts.addFailed();
-                this.baseStatistics.failedJob();
+                this.globalStatistics.failedJob();
                 if ( queueStats != null ) {
                     queueStats.failedJob();
                 }
@@ -134,7 +167,7 @@ public class StatisticsManager implement
             } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
                 final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
                 ts.addFinished(time == null ? -1 : time);
-                this.baseStatistics.finishedJob(time == null ? -1 : time);
+                this.globalStatistics.finishedJob(time == null ? -1 : time);
                 if ( queueStats != null ) {
                     queueStats.finishedJob(time == null ? -1 : time);
                 }
@@ -142,14 +175,14 @@ public class StatisticsManager implement
             } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
                 final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
                 ts.addActivated(time == null ? -1 : time);
-                this.baseStatistics.addActive(time == null ? -1 : time);
+                this.globalStatistics.addActive(time == null ? -1 : time);
                 if ( queueStats != null ) {
                     queueStats.addActive(time == null ? -1 : time);
                 }
 
             } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
-                this.baseStatistics.decQueued();
-                this.baseStatistics.cancelledJob();
+                this.globalStatistics.decQueued();
+                this.globalStatistics.cancelledJob();
                 if ( queueStats != null ) {
                     queueStats.decQueued();
                     queueStats.cancelledJob();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java Thu Oct 16 11:03:04 2014
@@ -174,6 +174,7 @@ public class QueueJobCache {
             // topic round robin
             boolean done = true;
             do {
+                done = true;
                 for(final Map.Entry<String, List<JobImpl>> entry : topicCache.entrySet()) {
                     if ( !entry.getValue().isEmpty() ) {
                         this.cache.add(entry.getValue().remove(0));

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Thu Oct 16 11:03:04 2014
@@ -31,7 +31,7 @@ import org.apache.sling.event.impl.Barri
 import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
@@ -45,6 +45,7 @@ import org.ops4j.pax.exam.spi.reactors.P
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
+import org.slf4j.LoggerFactory;
 
 @RunWith(PaxExam.class)
 @ExamReactorStrategy(PerMethod.class)
@@ -117,18 +118,19 @@ public class RoundRobinQueueTest extends
                         return JobResult.OK;
                     }
                 });
-        final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+        final ServiceRegistration ehReg = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
                     public void handleEvent(final Event event) {
+                        LoggerFactory.getLogger("test").info("Received finished event {}", event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID));
                         count.incrementAndGet();
                     }
                 });
 
         try {
             // we first sent one event to get the queue started
-            jobManager.addJob(TOPIC + "/start", null, null);
+            jobManager.addJob(TOPIC + "/start", null);
             assertTrue("No event received in the given time.", cb.block(5));
             cb.reset();
 
@@ -143,7 +145,7 @@ public class RoundRobinQueueTest extends
             // first jobs without id
             for(int i = 0; i < NUM_JOBS; i++ ) {
                 final String subTopic = TOPIC + "/sub" + (i % 10);
-                jobManager.addJob(subTopic, null, null);
+                jobManager.addJob(subTopic, null);
             }
             // second jobs with id
             for(int i = 0; i < NUM_JOBS; i++ ) {