You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/11 09:52:36 UTC

svn commit: r908879 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl: JobEventHandler.java job/JobBlockingQueue.java

Author: cziegeler
Date: Thu Feb 11 08:52:29 2010
New Revision: 908879

URL: http://svn.apache.org/viewvc?rev=908879&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
Add more debug logging, fix shutdown order and fix job counter for job queues.

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908879&r1=908878&r2=908879&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Thu Feb 11 08:52:29 2010
@@ -255,22 +255,28 @@
             final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator();
             while ( i.hasNext() ) {
                 final JobBlockingQueue jbq = i.next();
+                this.logger.debug("Shutting down job queue {}", jbq.getName());
+                this.logger.debug("Waking up sleeping queue {}", jbq.getName());
+                this.wakeUpJobQueue(jbq);
                 // wake up qeue
                 if ( jbq.isWaiting() ) {
+                    this.logger.debug("Waking up waiting queue {}", jbq.getName());
                     synchronized ( jbq.getLock()) {
                         jbq.notifyFinish(null);
                     }
                 }
-                // continue queue processing
+                // continue queue processing to stop the queue
                 try {
                     jbq.put(new EventInfo());
                 } catch (InterruptedException e) {
                     this.ignoreException(e);
                 }
+                this.logger.debug("Stopped job queue {}", jbq.getName());
             }
         }
         if ( this.backgroundSession != null ) {
             synchronized ( this.backgroundLock ) {
+                this.logger.debug("Shutting down background session.");
                 try {
                     this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
                 } catch (RepositoryException e) {
@@ -1678,20 +1684,28 @@
         if ( jobQueueName != null ) {
             synchronized ( this.jobQueues ) {
                 final JobBlockingQueue queue = this.jobQueues.get(jobQueueName);
-                if ( queue != null && queue.isSleeping() ) {
-                    final String schedulerJobName = queue.getSchedulerJobName();
-                    final Thread thread = queue.getSleepingThread();
-                    if ( schedulerJobName != null ) {
-                        this.scheduler.removeJob(schedulerJobName);
-                    }
-                    if ( thread != null ) {
-                        thread.interrupt();
-                    }
+                if ( queue != null ) {
+                    this.wakeUpJobQueue(queue);
                 }
             }
         }
     }
 
+    private void wakeUpJobQueue(final JobBlockingQueue jobQueue) {
+        if ( jobQueue.isSleeping() ) {
+            final String schedulerJobName = jobQueue.getSchedulerJobName();
+            final Thread thread = jobQueue.getSleepingThread();
+            if ( schedulerJobName != null ) {
+                final Scheduler localScheduler = this.scheduler;
+                if ( localScheduler != null ) {
+                    localScheduler.removeJob(schedulerJobName);
+                }
+            }
+            if ( thread != null ) {
+                thread.interrupt();
+            }
+        }
+    }
     /**
      * Helper method for sending the notification events.
      */

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=908879&r1=908878&r2=908879&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Thu Feb 11 08:52:29 2010
@@ -74,7 +74,9 @@
     public EventInfo waitForFinish() throws InterruptedException {
         this.isWaiting = true;
         this.markForCleanUp = false;
+        this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
         this.lock.wait();
+        this.logger.debug("Job queue {} is continuing.", this.queueName);
         this.isWaiting = false;
         final EventInfo object = this.eventInfo;
         this.eventInfo = null;
@@ -82,6 +84,13 @@
     }
 
     /**
+     * Get the name of the job queue.
+     */
+    public String getName() {
+        return this.queueName;
+    }
+
+    /**
      * Mark this queue for cleanup.
      */
     public void markForCleanUp() {
@@ -99,16 +108,21 @@
         if ( jobCount >= maxJobs ) {
             this.isWaiting = true;
             this.markForCleanUp = false;
+            this.logger.debug("Job queue {} is processing {} job - waiting for a free slot.", this.queueName, jobCount);
             this.lock.wait();
+            this.logger.debug("Job queue {} is continuing.", this.queueName);
             this.isWaiting = false;
         }
+        jobCount++;
     }
 
     /**
      * Free a slot when a job processing is finished.
      */
     public void freeSlot() {
+        jobCount--;
         if ( this.isWaiting ) {
+            this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
             this.lock.notify();
         }
     }
@@ -125,6 +139,7 @@
      */
     public void notifyFinish(EventInfo i) {
         this.eventInfo = i;
+        this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
         this.lock.notify();
     }
 
@@ -155,14 +170,14 @@
         }
     }
 
-    public void setSleeping(boolean flag, String schedulerJobName) {
+    public void setSleeping(String schedulerJobName) {
         this.schedulerJobName = schedulerJobName;
-        this.setSleeping(flag);
+        this.setSleeping(true);
     }
 
-    public void setSleeping(boolean flag, Thread sleepingThread) {
+    public void setSleeping(Thread sleepingThread) {
         this.sleepingThread = sleepingThread;
-        this.setSleeping(flag);
+        this.setSleeping(true);
     }
 
     public String getSchedulerJobName() {
@@ -197,8 +212,9 @@
             // this job again
             if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
                 final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                setSleeping(true, Thread.currentThread());
+                setSleeping(Thread.currentThread());
                 try {
+                    this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
                     Thread.sleep(delay);
                 } catch (InterruptedException e) {
                     this.ignoreException(e);
@@ -216,7 +232,7 @@
             final String schedulerJobName = "Waiting:" + queueName;
             final Runnable t = new Runnable() {
                 public void run() {
-                    setSleeping(true, schedulerJobName);
+                    setSleeping(schedulerJobName);
                     try {
                         put(info);
                     } catch (InterruptedException e) {