You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/04/22 11:32:09 UTC

svn commit: r1675292 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: InternalJobState.java queues/JobQueueImpl.java stats/StatisticsManager.java

Author: cziegeler
Date: Wed Apr 22 09:32:08 2015
New Revision: 1675292

URL: http://svn.apache.org/r1675292
Log:
SLING-4642 : Revisit statistics implementation

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java?rev=1675292&r1=1675291&r2=1675292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java Wed Apr 22 09:32:08 2015
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.event.impl.jobs;
 
+import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.consumer.JobExecutor;
 
 /**
@@ -25,7 +26,17 @@ import org.apache.sling.event.jobs.consu
  */
 public enum InternalJobState {
 
-    SUCCEEDED,  // processing finished successfully
-    FAILED,     // processing failed, can be retried
-    CANCELLED   // processing failed permanently
+    SUCCEEDED(NotificationConstants.TOPIC_JOB_FINISHED),    // processing finished successfully
+    FAILED(NotificationConstants.TOPIC_JOB_FAILED),         // processing failed, can be retried
+    CANCELLED(NotificationConstants.TOPIC_JOB_CANCELLED);   // processing failed permanently
+
+    private final String topic;
+
+    InternalJobState(final String topic) {
+        this.topic = topic;
+    }
+
+    public String getTopic() {
+        return this.topic;
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1675292&r1=1675291&r2=1675292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Wed Apr 22 09:32:08 2015
@@ -37,6 +37,7 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.EventingThreadPool;
+import org.apache.sling.event.impl.jobs.InternalJobState;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
@@ -507,7 +508,7 @@ public class JobQueueImpl
         // processing time is only set of state is SUCCEEDED
         public long         processingTime;
         public Job.JobState state;
-        public String       notificationTopic;
+        public InternalJobState       finalState;
     }
 
     private RescheduleInfo handleReschedule(final JobHandler handler, final Job.JobState resultState) {
@@ -519,7 +520,7 @@ public class JobQueueImpl
                     this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
                 }
                 info.processingTime = System.currentTimeMillis() - handler.started;
-                info.notificationTopic = NotificationConstants.TOPIC_JOB_FINISHED;
+                info.finalState = InternalJobState.SUCCEEDED;
                 break;
             case QUEUED : // check if we exceeded the number of retries
                 final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
@@ -530,21 +531,21 @@ public class JobQueueImpl
                     if ( this.logger.isDebugEnabled() ) {
                         this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
                     }
-                    info.notificationTopic = NotificationConstants.TOPIC_JOB_CANCELLED;
+                    info.finalState = InternalJobState.CANCELLED;
                 } else {
                     info.reschedule = true;
                     handler.getJob().retry();
                     if ( this.logger.isDebugEnabled() ) {
                         this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
                     }
-                    info.notificationTopic = NotificationConstants.TOPIC_JOB_FAILED;
+                    info.finalState = InternalJobState.FAILED;
                 }
                 break;
             default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
                 if ( this.logger.isDebugEnabled() ) {
                     this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
                 }
-                info.notificationTopic = NotificationConstants.TOPIC_JOB_CANCELLED;
+                info.finalState = InternalJobState.CANCELLED;
                 break;
         }
 
@@ -580,7 +581,7 @@ public class JobQueueImpl
             return false;
         }
 
-        // handle the reschedule, a new job might be returned with updated reschedule info!
+        // handle the rescheduling of the job
         final RescheduleInfo rescheduleInfo = this.handleReschedule(handler, resultState);
 
         if ( !rescheduleInfo.reschedule ) {
@@ -590,7 +591,15 @@ public class JobQueueImpl
         } else {
             this.reschedule(handler);
         }
-        NotificationUtility.sendNotification(this.services.eventAdmin, rescheduleInfo.notificationTopic, handler.getJob(), rescheduleInfo.processingTime);
+        // update statistics
+        this.services.statisticsManager.jobEnded(this.queueName,
+                handler.getJob().getTopic(),
+                rescheduleInfo.finalState,
+                rescheduleInfo.processingTime);
+        // send notification
+        NotificationUtility.sendNotification(this.services.eventAdmin,
+                rescheduleInfo.finalState.getTopic(),
+                handler.getJob(), rescheduleInfo.processingTime);
 
         return rescheduleInfo.reschedule;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1675292&r1=1675291&r2=1675292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Wed Apr 22 09:32:08 2015
@@ -27,6 +27,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.impl.jobs.InternalJobState;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.jobs.Job;
@@ -48,9 +49,6 @@ import org.osgi.service.event.EventHandl
     @Property(name=EventConstants.EVENT_TOPIC,
           value={NotificationConstants.TOPIC_JOB_ADDED,
                  NotificationConstants.TOPIC_JOB_STARTED,
-                 NotificationConstants.TOPIC_JOB_CANCELLED,
-                 NotificationConstants.TOPIC_JOB_FAILED,
-                 NotificationConstants.TOPIC_JOB_FINISHED,
                  NotificationConstants.TOPIC_JOB_REMOVED})
 })
 public class StatisticsManager implements EventHandler {
@@ -147,28 +145,6 @@ public class StatisticsManager implement
                     queueStats.incQueued();
                 }
 
-            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
-                ts.addCancelled();
-                this.globalStatistics.cancelledJob();
-                if ( queueStats != null ) {
-                    queueStats.cancelledJob();
-                }
-
-            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
-                ts.addFailed();
-                this.globalStatistics.failedJob();
-                if ( queueStats != null ) {
-                    queueStats.failedJob();
-                }
-
-            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
-                final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
-                ts.addFinished(time == null ? -1 : time);
-                this.globalStatistics.finishedJob(time == null ? -1 : time);
-                if ( queueStats != null ) {
-                    queueStats.finishedJob(time == null ? -1 : time);
-                }
-
             } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
                 final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
                 ts.addActivated(time == null ? -1 : time);
@@ -187,4 +163,41 @@ public class StatisticsManager implement
             }
         }
     }
+
+    public void jobEnded(final String queueName,
+            final String topic,
+            final InternalJobState state,
+            final long processingTime) {
+        final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
+
+        TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+        if ( ts == null ) {
+            this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
+            ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+        }
+
+        if ( state == InternalJobState.CANCELLED ) {
+            ts.addCancelled();
+            this.globalStatistics.cancelledJob();
+            if ( queueStats != null ) {
+                queueStats.cancelledJob();
+            }
+
+        } else if ( state == InternalJobState.FAILED ) {
+            ts.addFailed();
+            this.globalStatistics.failedJob();
+            if ( queueStats != null ) {
+                queueStats.failedJob();
+            }
+
+        } else if ( state == InternalJobState.SUCCEEDED ) {
+            ts.addFinished(processingTime);
+            this.globalStatistics.finishedJob(processingTime);
+            if ( queueStats != null ) {
+                queueStats.finishedJob(processingTime);
+            }
+
+        }
+
+    }
 }