You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/04/22 11:32:09 UTC
svn commit: r1675292 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
InternalJobState.java queues/JobQueueImpl.java stats/StatisticsManager.java
Author: cziegeler
Date: Wed Apr 22 09:32:08 2015
New Revision: 1675292
URL: http://svn.apache.org/r1675292
Log:
SLING-4642 : Revisit statistics implementation
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java?rev=1675292&r1=1675291&r2=1675292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java Wed Apr 22 09:32:08 2015
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.impl.jobs;
+import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.consumer.JobExecutor;
/**
@@ -25,7 +26,17 @@ import org.apache.sling.event.jobs.consu
*/
public enum InternalJobState {
- SUCCEEDED, // processing finished successfully
- FAILED, // processing failed, can be retried
- CANCELLED // processing failed permanently
+ SUCCEEDED(NotificationConstants.TOPIC_JOB_FINISHED), // processing finished successfully
+ FAILED(NotificationConstants.TOPIC_JOB_FAILED), // processing failed, can be retried
+ CANCELLED(NotificationConstants.TOPIC_JOB_CANCELLED); // processing failed permanently
+
+ private final String topic;
+
+ InternalJobState(final String topic) {
+ this.topic = topic;
+ }
+
+ public String getTopic() {
+ return this.topic;
+ }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1675292&r1=1675291&r2=1675292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Wed Apr 22 09:32:08 2015
@@ -37,6 +37,7 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.EventingThreadPool;
+import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
@@ -507,7 +508,7 @@ public class JobQueueImpl
// processing time is only set of state is SUCCEEDED
public long processingTime;
public Job.JobState state;
- public String notificationTopic;
+ public InternalJobState finalState;
}
private RescheduleInfo handleReschedule(final JobHandler handler, final Job.JobState resultState) {
@@ -519,7 +520,7 @@ public class JobQueueImpl
this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
}
info.processingTime = System.currentTimeMillis() - handler.started;
- info.notificationTopic = NotificationConstants.TOPIC_JOB_FINISHED;
+ info.finalState = InternalJobState.SUCCEEDED;
break;
case QUEUED : // check if we exceeded the number of retries
final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
@@ -530,21 +531,21 @@ public class JobQueueImpl
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
}
- info.notificationTopic = NotificationConstants.TOPIC_JOB_CANCELLED;
+ info.finalState = InternalJobState.CANCELLED;
} else {
info.reschedule = true;
handler.getJob().retry();
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
}
- info.notificationTopic = NotificationConstants.TOPIC_JOB_FAILED;
+ info.finalState = InternalJobState.FAILED;
}
break;
default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
}
- info.notificationTopic = NotificationConstants.TOPIC_JOB_CANCELLED;
+ info.finalState = InternalJobState.CANCELLED;
break;
}
@@ -580,7 +581,7 @@ public class JobQueueImpl
return false;
}
- // handle the reschedule, a new job might be returned with updated reschedule info!
+ // handle the rescheduling of the job
final RescheduleInfo rescheduleInfo = this.handleReschedule(handler, resultState);
if ( !rescheduleInfo.reschedule ) {
@@ -590,7 +591,15 @@ public class JobQueueImpl
} else {
this.reschedule(handler);
}
- NotificationUtility.sendNotification(this.services.eventAdmin, rescheduleInfo.notificationTopic, handler.getJob(), rescheduleInfo.processingTime);
+ // update statistics
+ this.services.statisticsManager.jobEnded(this.queueName,
+ handler.getJob().getTopic(),
+ rescheduleInfo.finalState,
+ rescheduleInfo.processingTime);
+ // send notification
+ NotificationUtility.sendNotification(this.services.eventAdmin,
+ rescheduleInfo.finalState.getTopic(),
+ handler.getJob(), rescheduleInfo.processingTime);
return rescheduleInfo.reschedule;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1675292&r1=1675291&r2=1675292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Wed Apr 22 09:32:08 2015
@@ -27,6 +27,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.jobs.Job;
@@ -48,9 +49,6 @@ import org.osgi.service.event.EventHandl
@Property(name=EventConstants.EVENT_TOPIC,
value={NotificationConstants.TOPIC_JOB_ADDED,
NotificationConstants.TOPIC_JOB_STARTED,
- NotificationConstants.TOPIC_JOB_CANCELLED,
- NotificationConstants.TOPIC_JOB_FAILED,
- NotificationConstants.TOPIC_JOB_FINISHED,
NotificationConstants.TOPIC_JOB_REMOVED})
})
public class StatisticsManager implements EventHandler {
@@ -147,28 +145,6 @@ public class StatisticsManager implement
queueStats.incQueued();
}
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
- ts.addCancelled();
- this.globalStatistics.cancelledJob();
- if ( queueStats != null ) {
- queueStats.cancelledJob();
- }
-
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
- ts.addFailed();
- this.globalStatistics.failedJob();
- if ( queueStats != null ) {
- queueStats.failedJob();
- }
-
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
- final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
- ts.addFinished(time == null ? -1 : time);
- this.globalStatistics.finishedJob(time == null ? -1 : time);
- if ( queueStats != null ) {
- queueStats.finishedJob(time == null ? -1 : time);
- }
-
} else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
ts.addActivated(time == null ? -1 : time);
@@ -187,4 +163,41 @@ public class StatisticsManager implement
}
}
}
+
+ public void jobEnded(final String queueName,
+ final String topic,
+ final InternalJobState state,
+ final long processingTime) {
+ final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
+
+ TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+ if ( ts == null ) {
+ this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
+ ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+ }
+
+ if ( state == InternalJobState.CANCELLED ) {
+ ts.addCancelled();
+ this.globalStatistics.cancelledJob();
+ if ( queueStats != null ) {
+ queueStats.cancelledJob();
+ }
+
+ } else if ( state == InternalJobState.FAILED ) {
+ ts.addFailed();
+ this.globalStatistics.failedJob();
+ if ( queueStats != null ) {
+ queueStats.failedJob();
+ }
+
+ } else if ( state == InternalJobState.SUCCEEDED ) {
+ ts.addFinished(processingTime);
+ this.globalStatistics.finishedJob(processingTime);
+ if ( queueStats != null ) {
+ queueStats.finishedJob(processingTime);
+ }
+
+ }
+
+ }
}