You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/11 09:52:36 UTC
svn commit: r908879 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl:
JobEventHandler.java job/JobBlockingQueue.java
Author: cziegeler
Date: Thu Feb 11 08:52:29 2010
New Revision: 908879
URL: http://svn.apache.org/viewvc?rev=908879&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
Add more debug logging, fix shutdown order and fix job counter for job queues.
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908879&r1=908878&r2=908879&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Thu Feb 11 08:52:29 2010
@@ -255,22 +255,28 @@
final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator();
while ( i.hasNext() ) {
final JobBlockingQueue jbq = i.next();
+ this.logger.debug("Shutting down job queue {}", jbq.getName());
+ this.logger.debug("Waking up sleeping queue {}", jbq.getName());
+ this.wakeUpJobQueue(jbq);
// wake up qeue
if ( jbq.isWaiting() ) {
+ this.logger.debug("Waking up waiting queue {}", jbq.getName());
synchronized ( jbq.getLock()) {
jbq.notifyFinish(null);
}
}
- // continue queue processing
+ // continue queue processing to stop the queue
try {
jbq.put(new EventInfo());
} catch (InterruptedException e) {
this.ignoreException(e);
}
+ this.logger.debug("Stopped job queue {}", jbq.getName());
}
}
if ( this.backgroundSession != null ) {
synchronized ( this.backgroundLock ) {
+ this.logger.debug("Shutting down background session.");
try {
this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
} catch (RepositoryException e) {
@@ -1678,20 +1684,28 @@
if ( jobQueueName != null ) {
synchronized ( this.jobQueues ) {
final JobBlockingQueue queue = this.jobQueues.get(jobQueueName);
- if ( queue != null && queue.isSleeping() ) {
- final String schedulerJobName = queue.getSchedulerJobName();
- final Thread thread = queue.getSleepingThread();
- if ( schedulerJobName != null ) {
- this.scheduler.removeJob(schedulerJobName);
- }
- if ( thread != null ) {
- thread.interrupt();
- }
+ if ( queue != null ) {
+ this.wakeUpJobQueue(queue);
}
}
}
}
+ private void wakeUpJobQueue(final JobBlockingQueue jobQueue) {
+ if ( jobQueue.isSleeping() ) {
+ final String schedulerJobName = jobQueue.getSchedulerJobName();
+ final Thread thread = jobQueue.getSleepingThread();
+ if ( schedulerJobName != null ) {
+ final Scheduler localScheduler = this.scheduler;
+ if ( localScheduler != null ) {
+ localScheduler.removeJob(schedulerJobName);
+ }
+ }
+ if ( thread != null ) {
+ thread.interrupt();
+ }
+ }
+ }
/**
* Helper method for sending the notification events.
*/
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=908879&r1=908878&r2=908879&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Thu Feb 11 08:52:29 2010
@@ -74,7 +74,9 @@
public EventInfo waitForFinish() throws InterruptedException {
this.isWaiting = true;
this.markForCleanUp = false;
+ this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
this.lock.wait();
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
this.isWaiting = false;
final EventInfo object = this.eventInfo;
this.eventInfo = null;
@@ -82,6 +84,13 @@
}
/**
+ * Get the name of the job queue.
+ */
+ public String getName() {
+ return this.queueName;
+ }
+
+ /**
* Mark this queue for cleanup.
*/
public void markForCleanUp() {
@@ -99,16 +108,21 @@
if ( jobCount >= maxJobs ) {
this.isWaiting = true;
this.markForCleanUp = false;
+ this.logger.debug("Job queue {} is processing {} job - waiting for a free slot.", this.queueName, jobCount);
this.lock.wait();
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
this.isWaiting = false;
}
+ jobCount++;
}
/**
* Free a slot when a job processing is finished.
*/
public void freeSlot() {
+ jobCount--;
if ( this.isWaiting ) {
+ this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
this.lock.notify();
}
}
@@ -125,6 +139,7 @@
*/
public void notifyFinish(EventInfo i) {
this.eventInfo = i;
+ this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
this.lock.notify();
}
@@ -155,14 +170,14 @@
}
}
- public void setSleeping(boolean flag, String schedulerJobName) {
+ public void setSleeping(String schedulerJobName) {
this.schedulerJobName = schedulerJobName;
- this.setSleeping(flag);
+ this.setSleeping(true);
}
- public void setSleeping(boolean flag, Thread sleepingThread) {
+ public void setSleeping(Thread sleepingThread) {
this.sleepingThread = sleepingThread;
- this.setSleeping(flag);
+ this.setSleeping(true);
}
public String getSchedulerJobName() {
@@ -197,8 +212,9 @@
// this job again
if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
- setSleeping(true, Thread.currentThread());
+ setSleeping(Thread.currentThread());
try {
+ this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
Thread.sleep(delay);
} catch (InterruptedException e) {
this.ignoreException(e);
@@ -216,7 +232,7 @@
final String schedulerJobName = "Waiting:" + queueName;
final Runnable t = new Runnable() {
public void run() {
- setSleeping(true, schedulerJobName);
+ setSleeping(schedulerJobName);
try {
put(info);
} catch (InterruptedException e) {