You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/04/22 14:42:43 UTC

svn commit: r1675348 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/queues/ main/java/org/apache/sling/event/impl/jobs/stats/ test/java/org/apache/sling/event/it/

Author: cziegeler
Date: Wed Apr 22 12:42:43 2015
New Revision: 1675348

URL: http://svn.apache.org/r1675348
Log:
SLING-4642 : Revisit statistics implementation

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Wed Apr 22 12:42:43 2015
@@ -135,7 +135,7 @@ public class JobQueueImpl
                         final InternalQueueConfiguration config,
                         final QueueServices services,
                         final Set<String> topics) {
-        final QueueJobCache cache = new QueueJobCache(services.configuration, config.getType(), topics);
+        final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
         if ( cache.isEmpty() ) {
             return null;
         }
@@ -213,7 +213,8 @@ public class JobQueueImpl
                 boolean started = false;
                 this.lock.writeLock().lock();
                 try {
-                    final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, this.doFullCacheSearch.getAndSet(false));
+                    final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager,
+                            this.services.statisticsManager, this, this.doFullCacheSearch.getAndSet(false));
                     if ( handler != null ) {
                         started = true;
                         this.threadPool.execute(new Runnable() {
@@ -503,7 +504,7 @@ public class JobQueueImpl
      * @param handler The job handler
      */
     private void requeue(final JobHandler handler) {
-        this.cache.reschedule(handler);
+        this.cache.reschedule(this.queueName, handler, this.services.statisticsManager);
         this.startJobs();
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Wed Apr 22 12:42:43 2015
@@ -36,6 +36,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.QueueConfiguration.Type;
@@ -80,13 +81,15 @@ public class QueueJobCache {
      * @param topics The topics handled by this queue.
      */
     public QueueJobCache(final JobManagerConfiguration configuration,
+            final String queueName,
+            final StatisticsManager statisticsManager,
             final QueueConfiguration.Type queueType,
             final Set<String> topics) {
         this.configuration = configuration;
         this.queueType = queueType;
         this.topics = new ConcurrentSkipListSet<String>(topics);
         this.topicsWithNewJobs.addAll(topics);
-        this.fillCache();
+        this.fillCache(queueName, statisticsManager);
     }
 
     /**
@@ -122,11 +125,11 @@ public class QueueJobCache {
      * Fill the cache.
      * No need to sync as this is called from the constructor.
      */
-    private void fillCache() {
+    private void fillCache(final String queueName, final StatisticsManager statisticsManager) {
         final Set<String> checkingTopics = new HashSet<String>();
         checkingTopics.addAll(this.topics);
         if ( !checkingTopics.isEmpty() ) {
-            this.loadJobs(checkingTopics);
+            this.loadJobs(queueName, checkingTopics, statisticsManager);
         }
     }
 
@@ -137,6 +140,7 @@ public class QueueJobCache {
      * can be called concurrently.
      */
     public JobHandler getNextJob(final JobConsumerManager jobConsumerManager,
+            final StatisticsManager statisticsManager,
             final Queue queue,
             final boolean doFull) {
         JobHandler handler = null;
@@ -156,7 +160,7 @@ public class QueueJobCache {
                             checkingTopics.addAll(this.topics);
                         }
                         if ( !checkingTopics.isEmpty() ) {
-                            this.loadJobs(checkingTopics);
+                            this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
                         }
                     }
 
@@ -192,7 +196,8 @@ public class QueueJobCache {
      * Load the next N x numberOf(topics) jobs
      * @param checkingTopics The set of topics to check.
      */
-    private void loadJobs( final Set<String> checkingTopics) {
+    private void loadJobs( final String queueName, final Set<String> checkingTopics,
+            final StatisticsManager statisticsManager) {
         logger.debug("Starting jobs loading from {}...", checkingTopics);
 
         final Map<String, List<JobImpl>> topicCache = new HashMap<String, List<JobImpl>>();
@@ -206,7 +211,7 @@ public class QueueJobCache {
 
                     final Resource topicResource = baseResource.getChild(topic.replace('/', '.'));
                     if ( topicResource != null ) {
-                        topicCache.put(topic, loadJobs(topic, topicResource));
+                        topicCache.put(topic, loadJobs(queueName, topic, topicResource, statisticsManager));
                     }
                 }
             }
@@ -252,7 +257,9 @@ public class QueueJobCache {
      * @param topicResource The parent resource of the jobs
      * @return The cache which will be filled with the jobs.
      */
-    private List<JobImpl> loadJobs(final String topic, final Resource topicResource) {
+    private List<JobImpl> loadJobs(final String queueName, final String topic,
+            final Resource topicResource,
+            final StatisticsManager statisticsManager) {
         logger.debug("Loading jobs from topic {}", topic);
         final List<JobImpl> list = new ArrayList<JobImpl>();
 
@@ -264,6 +271,7 @@ public class QueueJobCache {
             public boolean handle(final JobImpl job) {
                 if ( job.getProcessingStarted() == null && !job.hasReadErrors() ) {
                     list.add(job);
+                    statisticsManager.jobQueued(queueName, topic);
                     if ( list.size() == maxPreloadLimit ) {
                         scanTopic.set(true);
                     }
@@ -303,7 +311,7 @@ public class QueueJobCache {
      * Reschedule the job and add it back into the cache.
      * @param handler The job handler
      */
-    public void reschedule(final JobHandler handler) {
+    public void reschedule(final String queueName, final JobHandler handler, final StatisticsManager statisticsManager) {
         synchronized ( this.cache ) {
             if ( handler.reschedule() ) {
                 if ( this.queueType == Type.ORDERED ) {
@@ -311,6 +319,7 @@ public class QueueJobCache {
                 } else {
                     this.cache.add(handler.getJob());
                 }
+                statisticsManager.jobQueued(queueName, handler.getJob().getTopic());
             }
         }
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Wed Apr 22 12:42:43 2015
@@ -46,8 +46,7 @@ import org.osgi.service.event.EventHandl
 @Service(value={EventHandler.class, StatisticsManager.class})
 @Properties({
     @Property(name=EventConstants.EVENT_TOPIC,
-          value={NotificationConstants.TOPIC_JOB_ADDED,
-                 NotificationConstants.TOPIC_JOB_REMOVED})
+          value={NotificationConstants.TOPIC_JOB_REMOVED})
 })
 public class StatisticsManager implements EventHandler {
 
@@ -131,19 +130,7 @@ public class StatisticsManager implement
             final String queueName = (String)event.getProperty(Job.PROPERTY_JOB_QUEUE_NAME);
             final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
 
-            TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
-            if ( ts == null ) {
-                this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
-                ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
-            }
-
-            if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_ADDED) ) {
-                this.globalStatistics.incQueued();
-                if ( queueStats != null ) {
-                    queueStats.incQueued();
-                }
-
-            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
+            if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
                 this.globalStatistics.decQueued();
                 this.globalStatistics.cancelledJob();
                 if ( queueStats != null ) {
@@ -207,4 +194,14 @@ public class StatisticsManager implement
             queueStats.addActive(queueTime);
         }
     }
+
+    public void jobQueued(final String queueName,
+            final String topic) {
+        final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
+
+        this.globalStatistics.incQueued();
+        if ( queueStats != null ) {
+            queueStats.incQueued();
+        }
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java Wed Apr 22 12:42:43 2015
@@ -27,27 +27,27 @@ public class TopicStatisticsImpl impleme
 
     private final String topic;
 
-    private long lastActivated = -1;
+    private volatile long lastActivated = -1;
 
-    private long lastFinished = -1;
+    private volatile long lastFinished = -1;
 
-    private long averageWaitingTime;
+    private volatile long averageWaitingTime;
 
-    private long averageProcessingTime;
+    private volatile long averageProcessingTime;
 
-    private long waitingTime;
+    private volatile long waitingTime;
 
-    private long processingTime;
+    private volatile long processingTime;
 
-    private long waitingCount;
+    private volatile long waitingCount;
 
-    private long processingCount;
+    private volatile long processingCount;
 
-    private long finishedJobs;
+    private volatile long finishedJobs;
 
-    private long failedJobs;
+    private volatile long failedJobs;
 
-    private long cancelledJobs;
+    private volatile long cancelledJobs;
 
     /** Constructor. */
     public TopicStatisticsImpl(final String topic) {

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java Wed Apr 22 12:42:43 2015
@@ -209,7 +209,7 @@ public class ClassloadingTest extends Ab
                             && finishedEvents.size() == 0
                             && jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
                                     (Map<String, Object>[]) null).size() == 1
-                            && jobManager.getStatistics().getNumberOfQueuedJobs() == 1
+                            && jobManager.getStatistics().getNumberOfQueuedJobs() == 0
                             && jobManager.getStatistics().getNumberOfActiveJobs() == 0;
                 }