You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/04/22 14:42:43 UTC
svn commit: r1675348 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/queues/
main/java/org/apache/sling/event/impl/jobs/stats/
test/java/org/apache/sling/event/it/
Author: cziegeler
Date: Wed Apr 22 12:42:43 2015
New Revision: 1675348
URL: http://svn.apache.org/r1675348
Log:
SLING-4642 : Revisit statistics implementation
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Wed Apr 22 12:42:43 2015
@@ -135,7 +135,7 @@ public class JobQueueImpl
final InternalQueueConfiguration config,
final QueueServices services,
final Set<String> topics) {
- final QueueJobCache cache = new QueueJobCache(services.configuration, config.getType(), topics);
+ final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
if ( cache.isEmpty() ) {
return null;
}
@@ -213,7 +213,8 @@ public class JobQueueImpl
boolean started = false;
this.lock.writeLock().lock();
try {
- final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, this.doFullCacheSearch.getAndSet(false));
+ final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager,
+ this.services.statisticsManager, this, this.doFullCacheSearch.getAndSet(false));
if ( handler != null ) {
started = true;
this.threadPool.execute(new Runnable() {
@@ -503,7 +504,7 @@ public class JobQueueImpl
* @param handler The job handler
*/
private void requeue(final JobHandler handler) {
- this.cache.reschedule(handler);
+ this.cache.reschedule(this.queueName, handler, this.services.statisticsManager);
this.startJobs();
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Wed Apr 22 12:42:43 2015
@@ -36,6 +36,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.QueueConfiguration.Type;
@@ -80,13 +81,15 @@ public class QueueJobCache {
* @param topics The topics handled by this queue.
*/
public QueueJobCache(final JobManagerConfiguration configuration,
+ final String queueName,
+ final StatisticsManager statisticsManager,
final QueueConfiguration.Type queueType,
final Set<String> topics) {
this.configuration = configuration;
this.queueType = queueType;
this.topics = new ConcurrentSkipListSet<String>(topics);
this.topicsWithNewJobs.addAll(topics);
- this.fillCache();
+ this.fillCache(queueName, statisticsManager);
}
/**
@@ -122,11 +125,11 @@ public class QueueJobCache {
* Fill the cache.
* No need to sync as this is called from the constructor.
*/
- private void fillCache() {
+ private void fillCache(final String queueName, final StatisticsManager statisticsManager) {
final Set<String> checkingTopics = new HashSet<String>();
checkingTopics.addAll(this.topics);
if ( !checkingTopics.isEmpty() ) {
- this.loadJobs(checkingTopics);
+ this.loadJobs(queueName, checkingTopics, statisticsManager);
}
}
@@ -137,6 +140,7 @@ public class QueueJobCache {
* can be called concurrently.
*/
public JobHandler getNextJob(final JobConsumerManager jobConsumerManager,
+ final StatisticsManager statisticsManager,
final Queue queue,
final boolean doFull) {
JobHandler handler = null;
@@ -156,7 +160,7 @@ public class QueueJobCache {
checkingTopics.addAll(this.topics);
}
if ( !checkingTopics.isEmpty() ) {
- this.loadJobs(checkingTopics);
+ this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
}
}
@@ -192,7 +196,8 @@ public class QueueJobCache {
* Load the next N x numberOf(topics) jobs
* @param checkingTopics The set of topics to check.
*/
- private void loadJobs( final Set<String> checkingTopics) {
+ private void loadJobs( final String queueName, final Set<String> checkingTopics,
+ final StatisticsManager statisticsManager) {
logger.debug("Starting jobs loading from {}...", checkingTopics);
final Map<String, List<JobImpl>> topicCache = new HashMap<String, List<JobImpl>>();
@@ -206,7 +211,7 @@ public class QueueJobCache {
final Resource topicResource = baseResource.getChild(topic.replace('/', '.'));
if ( topicResource != null ) {
- topicCache.put(topic, loadJobs(topic, topicResource));
+ topicCache.put(topic, loadJobs(queueName, topic, topicResource, statisticsManager));
}
}
}
@@ -252,7 +257,9 @@ public class QueueJobCache {
* @param topicResource The parent resource of the jobs
* @return The cache which will be filled with the jobs.
*/
- private List<JobImpl> loadJobs(final String topic, final Resource topicResource) {
+ private List<JobImpl> loadJobs(final String queueName, final String topic,
+ final Resource topicResource,
+ final StatisticsManager statisticsManager) {
logger.debug("Loading jobs from topic {}", topic);
final List<JobImpl> list = new ArrayList<JobImpl>();
@@ -264,6 +271,7 @@ public class QueueJobCache {
public boolean handle(final JobImpl job) {
if ( job.getProcessingStarted() == null && !job.hasReadErrors() ) {
list.add(job);
+ statisticsManager.jobQueued(queueName, topic);
if ( list.size() == maxPreloadLimit ) {
scanTopic.set(true);
}
@@ -303,7 +311,7 @@ public class QueueJobCache {
* Reschedule the job and add it back into the cache.
* @param handler The job handler
*/
- public void reschedule(final JobHandler handler) {
+ public void reschedule(final String queueName, final JobHandler handler, final StatisticsManager statisticsManager) {
synchronized ( this.cache ) {
if ( handler.reschedule() ) {
if ( this.queueType == Type.ORDERED ) {
@@ -311,6 +319,7 @@ public class QueueJobCache {
} else {
this.cache.add(handler.getJob());
}
+ statisticsManager.jobQueued(queueName, handler.getJob().getTopic());
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Wed Apr 22 12:42:43 2015
@@ -46,8 +46,7 @@ import org.osgi.service.event.EventHandl
@Service(value={EventHandler.class, StatisticsManager.class})
@Properties({
@Property(name=EventConstants.EVENT_TOPIC,
- value={NotificationConstants.TOPIC_JOB_ADDED,
- NotificationConstants.TOPIC_JOB_REMOVED})
+ value={NotificationConstants.TOPIC_JOB_REMOVED})
})
public class StatisticsManager implements EventHandler {
@@ -131,19 +130,7 @@ public class StatisticsManager implement
final String queueName = (String)event.getProperty(Job.PROPERTY_JOB_QUEUE_NAME);
final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
- TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
- if ( ts == null ) {
- this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
- ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
- }
-
- if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_ADDED) ) {
- this.globalStatistics.incQueued();
- if ( queueStats != null ) {
- queueStats.incQueued();
- }
-
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
+ if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
this.globalStatistics.decQueued();
this.globalStatistics.cancelledJob();
if ( queueStats != null ) {
@@ -207,4 +194,14 @@ public class StatisticsManager implement
queueStats.addActive(queueTime);
}
}
+
+ public void jobQueued(final String queueName,
+ final String topic) {
+ final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
+
+ this.globalStatistics.incQueued();
+ if ( queueStats != null ) {
+ queueStats.incQueued();
+ }
+ }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java Wed Apr 22 12:42:43 2015
@@ -27,27 +27,27 @@ public class TopicStatisticsImpl impleme
private final String topic;
- private long lastActivated = -1;
+ private volatile long lastActivated = -1;
- private long lastFinished = -1;
+ private volatile long lastFinished = -1;
- private long averageWaitingTime;
+ private volatile long averageWaitingTime;
- private long averageProcessingTime;
+ private volatile long averageProcessingTime;
- private long waitingTime;
+ private volatile long waitingTime;
- private long processingTime;
+ private volatile long processingTime;
- private long waitingCount;
+ private volatile long waitingCount;
- private long processingCount;
+ private volatile long processingCount;
- private long finishedJobs;
+ private volatile long finishedJobs;
- private long failedJobs;
+ private volatile long failedJobs;
- private long cancelledJobs;
+ private volatile long cancelledJobs;
/** Constructor. */
public TopicStatisticsImpl(final String topic) {
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1675348&r1=1675347&r2=1675348&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java Wed Apr 22 12:42:43 2015
@@ -209,7 +209,7 @@ public class ClassloadingTest extends Ab
&& finishedEvents.size() == 0
&& jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
(Map<String, Object>[]) null).size() == 1
- && jobManager.getStatistics().getNumberOfQueuedJobs() == 1
+ && jobManager.getStatistics().getNumberOfQueuedJobs() == 0
&& jobManager.getStatistics().getNumberOfActiveJobs() == 0;
}