You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/10 14:25:44 UTC
svn commit: r908489 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl:
JobEventHandler.java job/JobBlockingQueue.java
Author: cziegeler
Date: Wed Feb 10 13:25:42 2010
New Revision: 908489
URL: http://svn.apache.org/viewvc?rev=908489&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
Change behaviour of ordered queues: the first job arriving defines if the queue is ordered!
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908489&r1=908488&r2=908489&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Feb 10 13:25:42 2010
@@ -587,7 +587,8 @@
logger.debug("Queuing job {} into queue {}.", info.event, queueName);
BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
if ( jobQueue == null ) {
- final JobBlockingQueue jq = new JobBlockingQueue(queueName, this.logger);
+ final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+ final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
jobQueue = jq;
this.jobQueues.put(queueName, jq);
// Start background thread
@@ -654,16 +655,11 @@
info = null;
final Status status = this.executeJob(processInfo, jobQueue);
if ( status == Status.SUCCESS ) {
- EventInfo newInfo = null;
try {
- newInfo = jobQueue.waitForFinish();
+ info = jobQueue.waitForFinish();
} catch (InterruptedException e) {
this.ignoreException(e);
}
- // if we have an info, this is a reschedule
- if ( newInfo != null ) {
- info = jobQueue.reschedule(newInfo, this.scheduler);
- }
} else if ( status == Status.RESCHEDULE ) {
info = jobQueue.reschedule(processInfo, this.scheduler);
}
@@ -1432,7 +1428,11 @@
jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
}
synchronized ( jobQueue.getLock()) {
- jobQueue.notifyFinish(info);
+ EventInfo reprocessInfo = null;
+ if ( info != null ) {
+ reprocessInfo = jobQueue.reschedule(info, this.scheduler);
+ }
+ jobQueue.notifyFinish(reprocessInfo);
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=908489&r1=908488&r2=908489&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Wed Feb 10 13:25:42 2010
@@ -54,8 +54,14 @@
/** The logger. */
private final Logger logger;
- public JobBlockingQueue(final String name, final Logger logger) {
+ /** Ordered Queue? */
+ private final boolean orderedQueue;
+
+ public JobBlockingQueue(final String name,
+ final boolean orderedQueue,
+ final Logger logger) {
this.queueName = name;
+ this.orderedQueue = orderedQueue;
this.logger = logger;
}
@@ -131,6 +137,13 @@
}
/**
+ * Is this a ordered queue?
+ */
+ public boolean isOrdered() {
+ return orderedQueue;
+ }
+
+ /**
* Reschedule a job.
* If this is a ordered queue, this method will return the event info
* which should be processed next. Otherwise null is returned.
@@ -138,8 +151,7 @@
public EventInfo reschedule(final EventInfo info, final Scheduler scheduler) {
final Event job = info.event;
// is this an ordered queue?
- final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
- if ( orderedQueue ) {
+ if ( this.orderedQueue ) {
// we just sleep for the delay time - if none, we continue and retry
// this job again
if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {