You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/19 08:31:16 UTC

svn commit: r1024129 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event: impl/jobs/DefaultJobManager.java impl/jobs/jcr/PersistenceHandler.java jobs/QueueConfiguration.java

Author: cziegeler
Date: Tue Oct 19 06:31:15 2010
New Revision: 1024129

URL: http://svn.apache.org/viewvc?rev=1024129&view=rev
Log:
Lock on special lock object and add a dropping queue

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1024129&r1=1024128&r2=1024129&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Tue Oct 19 06:31:15 2010
@@ -55,6 +55,7 @@ import org.apache.sling.event.jobs.JobMa
 import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.JobsIterator;
 import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.Statistics;
 import org.apache.sling.event.jobs.TopicStatistics;
 import org.osgi.service.event.Event;
@@ -116,6 +117,9 @@ public class DefaultJobManager
     @Reference
     private Scheduler scheduler;
 
+    /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
+    private final Object queuesLock = new Object();
+
     /** All active queues. */
     private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
 
@@ -195,7 +199,7 @@ public class DefaultJobManager
     public void cleanup() {
         // check for idle queue
         // we synchronize to avoid creating a queue which is about to be removed during cleanup
-        synchronized ( this ) {
+        synchronized ( queuesLock ) {
             final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
             while ( i.hasNext() ) {
                 final Map.Entry<String, AbstractJobQueue> current = i.next();
@@ -229,7 +233,7 @@ public class DefaultJobManager
         if ( config == null ) {
             final String customQueueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
             if ( customQueueName != null ) {
-                synchronized ( this ) {
+                synchronized ( queuesLock ) {
                     final AbstractJobQueue queue = this.queues.get(customQueueName);
                     if ( queue != null ) {
                         config = queue.getConfiguration();
@@ -249,15 +253,25 @@ public class DefaultJobManager
 
         if ( config.isSkipped(event) ) {
             if ( logger.isDebugEnabled() ) {
-                logger.debug("Ignoring event due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
+                logger.debug("Ignoring job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
+            }
+            return;
+        }
+
+        // drop?
+        if ( config.getType() == QueueConfiguration.Type.DROP ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Dropping job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
             }
+            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, event.event, null);
+            event.finished();
             return;
         }
 
         // get or create queue
         AbstractJobQueue queue = null;
         // we synchronize to avoid creating a queue which is about to be removed during cleanup
-        synchronized ( this ) {
+        synchronized ( queuesLock ) {
             queue = this.queues.get(queueName);
             // check for reconfiguration, we really do an identity check here(!)
             if ( queue != null && queue.getConfiguration() != config ) {
@@ -280,11 +294,11 @@ public class DefaultJobManager
                 queue = null;
             }
             if ( queue == null ) {
-                if ( config.getType() == InternalQueueConfiguration.Type.ORDERED ) {
+                if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
                     queue = new OrderedJobQueue(queueName, config, this.environment);
-                } else if ( config.getType() == InternalQueueConfiguration.Type.UNORDERED ) {
+                } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
                     queue = new ParallelJobQueue(queueName, config, this.environment, this.scheduler);
-                } else if ( config.getType() == InternalQueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
+                } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
                     queue = new TopicRoundRobinJobQueue(queueName, config, this.environment, this.scheduler);
                 }
                 if ( queue == null ) {
@@ -680,7 +694,7 @@ public class DefaultJobManager
      */
     public void restart() {
         // let's rename/close all queues first
-        synchronized ( this ) {
+        synchronized ( queuesLock ) {
             final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
             for(final AbstractJobQueue queue : queues ) {
                 // remove the queue with the old name

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1024129&r1=1024128&r2=1024129&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Tue Oct 19 06:31:15 2010
@@ -975,7 +975,7 @@ public class PersistenceHandler implemen
                     }
                     this.backgroundSession.save();
                     // and unlock
-                    if ( jobId != null ) {
+                    if ( jobId != null && eventNode.isLocked() ) {
                         this.backgroundSession.getWorkspace().getLockManager().unlock(path);
                     }
                 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1024129&r1=1024128&r2=1024129&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Tue Oct 19 06:31:15 2010
@@ -27,10 +27,11 @@ public interface QueueConfiguration {
 
     /** The queue type. */
     static enum Type {
-        UNORDERED,
-        ORDERED,
-        TOPIC_ROUND_ROBIN,
-        IGNORE
+        UNORDERED,          // unordered, parallel prpcessing
+        ORDERED,            // ordered, fifo
+        TOPIC_ROUND_ROBIN,  // unordered, parallel processing, executed based on topic
+        IGNORE,             // ignore job, but do not remove
+        DROP                // drop job without processing!
     }
 
     /**