You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/10 14:25:44 UTC

svn commit: r908489 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl: JobEventHandler.java job/JobBlockingQueue.java

Author: cziegeler
Date: Wed Feb 10 13:25:42 2010
New Revision: 908489

URL: http://svn.apache.org/viewvc?rev=908489&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
Change behaviour of ordered queues: the first job arriving defines if the queue is ordered!

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908489&r1=908488&r2=908489&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Feb 10 13:25:42 2010
@@ -587,7 +587,8 @@
                         logger.debug("Queuing job {} into queue {}.", info.event, queueName);
                         BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
                         if ( jobQueue == null ) {
-                            final JobBlockingQueue jq = new JobBlockingQueue(queueName, this.logger);
+                            final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+                            final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
                             jobQueue = jq;
                             this.jobQueues.put(queueName, jq);
                             // Start background thread
@@ -654,16 +655,11 @@
                     info = null;
                     final Status status = this.executeJob(processInfo, jobQueue);
                     if ( status == Status.SUCCESS ) {
-                        EventInfo newInfo = null;
                         try {
-                            newInfo = jobQueue.waitForFinish();
+                            info = jobQueue.waitForFinish();
                         } catch (InterruptedException e) {
                             this.ignoreException(e);
                         }
-                        // if we have an info, this is a reschedule
-                        if ( newInfo != null ) {
-                            info = jobQueue.reschedule(newInfo, this.scheduler);
-                        }
                     } else if ( status == Status.RESCHEDULE ) {
                         info = jobQueue.reschedule(processInfo, this.scheduler);
                     }
@@ -1432,7 +1428,11 @@
                 jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
             }
             synchronized ( jobQueue.getLock()) {
-                jobQueue.notifyFinish(info);
+                EventInfo reprocessInfo = null;
+                if ( info != null ) {
+                    reprocessInfo = jobQueue.reschedule(info, this.scheduler);
+                }
+                jobQueue.notifyFinish(reprocessInfo);
             }
         }
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=908489&r1=908488&r2=908489&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Wed Feb 10 13:25:42 2010
@@ -54,8 +54,14 @@
     /** The logger. */
     private final Logger logger;
 
-    public JobBlockingQueue(final String name, final Logger logger) {
+    /** Ordered Queue? */
+    private final boolean orderedQueue;
+
+    public JobBlockingQueue(final String name,
+                            final boolean orderedQueue,
+                            final Logger logger) {
         this.queueName = name;
+        this.orderedQueue = orderedQueue;
         this.logger = logger;
     }
 
@@ -131,6 +137,13 @@
     }
 
     /**
+     * Is this a ordered queue?
+     */
+    public boolean isOrdered() {
+        return orderedQueue;
+    }
+
+    /**
      * Reschedule a job.
      * If this is a ordered queue, this method will return the event info
      * which should be processed next. Otherwise null is returned.
@@ -138,8 +151,7 @@
     public EventInfo reschedule(final EventInfo info, final Scheduler scheduler) {
         final Event job = info.event;
         // is this an ordered queue?
-        final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
-        if ( orderedQueue ) {
+        if ( this.orderedQueue ) {
             // we just sleep for the delay time - if none, we continue and retry
             // this job again
             if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {