You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/19 08:31:16 UTC
svn commit: r1024129 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event:
impl/jobs/DefaultJobManager.java impl/jobs/jcr/PersistenceHandler.java
jobs/QueueConfiguration.java
Author: cziegeler
Date: Tue Oct 19 06:31:15 2010
New Revision: 1024129
URL: http://svn.apache.org/viewvc?rev=1024129&view=rev
Log:
Lock on special lock object and add a dropping queue
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1024129&r1=1024128&r2=1024129&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Tue Oct 19 06:31:15 2010
@@ -55,6 +55,7 @@ import org.apache.sling.event.jobs.JobMa
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.JobsIterator;
import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
import org.osgi.service.event.Event;
@@ -116,6 +117,9 @@ public class DefaultJobManager
@Reference
private Scheduler scheduler;
+ /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
+ private final Object queuesLock = new Object();
+
/** All active queues. */
private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
@@ -195,7 +199,7 @@ public class DefaultJobManager
public void cleanup() {
// check for idle queue
// we synchronize to avoid creating a queue which is about to be removed during cleanup
- synchronized ( this ) {
+ synchronized ( queuesLock ) {
final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
while ( i.hasNext() ) {
final Map.Entry<String, AbstractJobQueue> current = i.next();
@@ -229,7 +233,7 @@ public class DefaultJobManager
if ( config == null ) {
final String customQueueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
if ( customQueueName != null ) {
- synchronized ( this ) {
+ synchronized ( queuesLock ) {
final AbstractJobQueue queue = this.queues.get(customQueueName);
if ( queue != null ) {
config = queue.getConfiguration();
@@ -249,15 +253,25 @@ public class DefaultJobManager
if ( config.isSkipped(event) ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Ignoring event due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
+ logger.debug("Ignoring job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
+ }
+ return;
+ }
+
+ // drop?
+ if ( config.getType() == QueueConfiguration.Type.DROP ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Dropping job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
}
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, event.event, null);
+ event.finished();
return;
}
// get or create queue
AbstractJobQueue queue = null;
// we synchronize to avoid creating a queue which is about to be removed during cleanup
- synchronized ( this ) {
+ synchronized ( queuesLock ) {
queue = this.queues.get(queueName);
// check for reconfiguration, we really do an identity check here(!)
if ( queue != null && queue.getConfiguration() != config ) {
@@ -280,11 +294,11 @@ public class DefaultJobManager
queue = null;
}
if ( queue == null ) {
- if ( config.getType() == InternalQueueConfiguration.Type.ORDERED ) {
+ if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
queue = new OrderedJobQueue(queueName, config, this.environment);
- } else if ( config.getType() == InternalQueueConfiguration.Type.UNORDERED ) {
+ } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
queue = new ParallelJobQueue(queueName, config, this.environment, this.scheduler);
- } else if ( config.getType() == InternalQueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
+ } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
queue = new TopicRoundRobinJobQueue(queueName, config, this.environment, this.scheduler);
}
if ( queue == null ) {
@@ -680,7 +694,7 @@ public class DefaultJobManager
*/
public void restart() {
// let's rename/close all queues first
- synchronized ( this ) {
+ synchronized ( queuesLock ) {
final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
for(final AbstractJobQueue queue : queues ) {
// remove the queue with the old name
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1024129&r1=1024128&r2=1024129&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Tue Oct 19 06:31:15 2010
@@ -975,7 +975,7 @@ public class PersistenceHandler implemen
}
this.backgroundSession.save();
// and unlock
- if ( jobId != null ) {
+ if ( jobId != null && eventNode.isLocked() ) {
this.backgroundSession.getWorkspace().getLockManager().unlock(path);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1024129&r1=1024128&r2=1024129&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Tue Oct 19 06:31:15 2010
@@ -27,10 +27,11 @@ public interface QueueConfiguration {
/** The queue type. */
static enum Type {
- UNORDERED,
- ORDERED,
- TOPIC_ROUND_ROBIN,
- IGNORE
+ UNORDERED, // unordered, parallel prpcessing
+ ORDERED, // ordered, fifo
+ TOPIC_ROUND_ROBIN, // unordered, parallel processing, executed based on topic
+ IGNORE, // ignore job, but do not remove
+ DROP // drop job without processing!
}
/**