You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 13:03:05 UTC
svn commit: r1632272 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/stats/
main/java/org/apache/sling/event/impl/jobs/topics/
test/java/org/apache/sling/event/it/
Author: cziegeler
Date: Thu Oct 16 11:03:04 2014
New Revision: 1632272
URL: http://svn.apache.org/r1632272
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix round robin queue and implement statistics reset (WiP)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 11:03:04 2014
@@ -269,7 +269,7 @@ public class JobManagerImpl
*/
@Override
public synchronized Statistics getStatistics() {
- return this.statisticsManager.getOverallStatistics();
+ return this.statisticsManager.getGlobalStatistics();
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Thu Oct 16 11:03:04 2014
@@ -28,7 +28,6 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.jobs.Job;
@@ -38,9 +37,12 @@ import org.apache.sling.event.jobs.Topic
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * The statistics manager keeps track of all statistics related tasks.
+ * The statistics are updated through OSGi event by using the special
+ * job notification events.
+ */
@Component(immediate=true)
@Service(value={EventHandler.class, StatisticsManager.class})
@Properties({
@@ -54,17 +56,27 @@ import org.slf4j.LoggerFactory;
})
public class StatisticsManager implements EventHandler {
- /** Logger. */
- private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
-
+ /** The job manager configuration. */
@Reference
private JobManagerConfiguration configuration;
+ /** The queue configuration manager. */
@Reference
private QueueConfigurationManager queueConfigurationManager;
- /** Current statistics. */
- private final StatisticsImpl baseStatistics = new StatisticsImpl();
+ /** Global statistics. */
+ private final StatisticsImpl globalStatistics = new StatisticsImpl() {
+
+ @Override
+ public synchronized void reset() {
+ super.reset();
+ topicStatistics.clear();
+ for(final Statistics s : queueStatistics.values()) {
+ s.reset();
+ }
+ }
+
+ };
/** Statistics per topic. */
private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<String, TopicStatistics>();
@@ -72,14 +84,27 @@ public class StatisticsManager implement
/** Statistics per queue. */
private final ConcurrentMap<String, Statistics> queueStatistics = new ConcurrentHashMap<String, Statistics>();
- public Statistics getOverallStatistics() {
- return this.baseStatistics;
+ /**
+ * Get the global statistics.
+ * @return The global statistics.
+ */
+ public Statistics getGlobalStatistics() {
+ return this.globalStatistics;
}
+ /**
+ * Get all topic statistics.
+ * @return The map of topic statistics by topic.
+ */
public Map<String, TopicStatistics> getTopicStatistics() {
return topicStatistics;
}
+ /**
+ * Get a single queue statistics.
+ * @param queueName The queue name.
+ * @return The statistics for that queue.
+ */
public Statistics getQueueStatistics(final String queueName) {
Statistics queueStats = queueStatistics.get(queueName);
if ( queueStats == null ) {
@@ -88,6 +113,11 @@ public class StatisticsManager implement
return queueStats;
}
+ /**
+ * Internal method to get the statistics of a queue.
+ * @param queueName The queue name.
+ * @return The statistics or {@code null} if queue name is {@code null}.
+ */
private StatisticsImpl getStatisticsForQueue(final String queueName) {
if ( queueName == null ) {
return null;
@@ -100,6 +130,9 @@ public class StatisticsManager implement
return queueStats;
}
+ /**
+ * Handle all job notification events and update the statistics.
+ */
@Override
public void handleEvent(final Event event) {
final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
@@ -114,19 +147,19 @@ public class StatisticsManager implement
}
if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_ADDED) ) {
- this.baseStatistics.incQueued();
+ this.globalStatistics.incQueued();
queueStats.incQueued();
} else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
ts.addCancelled();
- this.baseStatistics.cancelledJob();
+ this.globalStatistics.cancelledJob();
if ( queueStats != null ) {
queueStats.cancelledJob();
}
} else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
ts.addFailed();
- this.baseStatistics.failedJob();
+ this.globalStatistics.failedJob();
if ( queueStats != null ) {
queueStats.failedJob();
}
@@ -134,7 +167,7 @@ public class StatisticsManager implement
} else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
ts.addFinished(time == null ? -1 : time);
- this.baseStatistics.finishedJob(time == null ? -1 : time);
+ this.globalStatistics.finishedJob(time == null ? -1 : time);
if ( queueStats != null ) {
queueStats.finishedJob(time == null ? -1 : time);
}
@@ -142,14 +175,14 @@ public class StatisticsManager implement
} else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
ts.addActivated(time == null ? -1 : time);
- this.baseStatistics.addActive(time == null ? -1 : time);
+ this.globalStatistics.addActive(time == null ? -1 : time);
if ( queueStats != null ) {
queueStats.addActive(time == null ? -1 : time);
}
} else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
- this.baseStatistics.decQueued();
- this.baseStatistics.cancelledJob();
+ this.globalStatistics.decQueued();
+ this.globalStatistics.cancelledJob();
if ( queueStats != null ) {
queueStats.decQueued();
queueStats.cancelledJob();
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java Thu Oct 16 11:03:04 2014
@@ -174,6 +174,7 @@ public class QueueJobCache {
// topic round robin
boolean done = true;
do {
+ done = true;
for(final Map.Entry<String, List<JobImpl>> entry : topicCache.entrySet()) {
if ( !entry.getValue().isEmpty() ) {
this.cache.add(entry.getValue().remove(0));
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1632272&r1=1632271&r2=1632272&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Thu Oct 16 11:03:04 2014
@@ -31,7 +31,7 @@ import org.apache.sling.event.impl.Barri
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobConsumer;
@@ -45,6 +45,7 @@ import org.ops4j.pax.exam.spi.reactors.P
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
+import org.slf4j.LoggerFactory;
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerMethod.class)
@@ -117,18 +118,19 @@ public class RoundRobinQueueTest extends
return JobResult.OK;
}
});
- final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ final ServiceRegistration ehReg = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
public void handleEvent(final Event event) {
+ LoggerFactory.getLogger("test").info("Received finished event {}", event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID));
count.incrementAndGet();
}
});
try {
// we first sent one event to get the queue started
- jobManager.addJob(TOPIC + "/start", null, null);
+ jobManager.addJob(TOPIC + "/start", null);
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
@@ -143,7 +145,7 @@ public class RoundRobinQueueTest extends
// first jobs without id
for(int i = 0; i < NUM_JOBS; i++ ) {
final String subTopic = TOPIC + "/sub" + (i % 10);
- jobManager.addJob(subTopic, null, null);
+ jobManager.addJob(subTopic, null);
}
// second jobs with id
for(int i = 0; i < NUM_JOBS; i++ ) {