You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/08/27 15:52:11 UTC
svn commit: r689479 -
/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Author: cziegeler
Date: Wed Aug 27 06:52:11 2008
New Revision: 689479
URL: http://svn.apache.org/viewvc?rev=689479&view=rev
Log:
SLING-628 - Implement separate queues, requeuing and ordered queues.
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=689479&r1=689478&r2=689479&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Aug 27 06:52:11 2008
@@ -79,7 +79,7 @@
private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
/** A map for the different job queues. */
- private final Map<String, BlockingQueue<EventInfo>> jobQueues = new HashMap<String, BlockingQueue<EventInfo>>();
+ private final Map<String, JobBlockingQueue> jobQueues = new HashMap<String, JobBlockingQueue>();
/** Default sleep time. */
private static final long DEFAULT_SLEEP_TIME = 30;
@@ -144,7 +144,7 @@
protected void deactivate(final ComponentContext context) {
super.deactivate(context);
synchronized ( this.jobQueues ) {
- final Iterator<BlockingQueue<EventInfo>> i = this.jobQueues.values().iterator();
+ final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator();
while ( i.hasNext() ) {
try {
i.next().put(new EventInfo());
@@ -349,9 +349,9 @@
synchronized ( this.jobQueues ) {
BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
if ( jobQueue == null ) {
- jobQueue = new LinkedBlockingQueue<EventInfo>();
- final BlockingQueue<EventInfo> jq = jobQueue;
- this.jobQueues.put(queueName, jobQueue);
+ final JobBlockingQueue jq = new JobBlockingQueue();
+ jobQueue = jq;
+ this.jobQueues.put(queueName, jq);
// Start background thread
this.threadPool.execute(new Runnable() {
@@ -388,19 +388,84 @@
* @param queueName The name of the job queue
* @param jobQueue The job queue
*/
- private void runJobQueue(final String queueName, final BlockingQueue<EventInfo> jobQueue) {
+ private void runJobQueue(final String queueName, final JobBlockingQueue jobQueue) {
+ EventInfo info = null;
while ( this.running ) {
- // so let's wait/get the next job from the queue
- EventInfo info = null;
- try {
- info = jobQueue.take();
- } catch (InterruptedException e) {
- // we ignore this
- this.ignoreException(e);
+ if ( info == null ) {
+ // so let's wait/get the next job from the queue
+ try {
+ info = jobQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
}
if ( info != null && this.running ) {
- this.executeJob(info, jobQueue);
+ final EventInfo processInfo = info;
+ info = null;
+ if ( this.executeJob(processInfo, jobQueue) ) {
+ try {
+ jobQueue.wait();
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // if we have an info, this is a reschedule
+ final EventInfo newInfo = jobQueue.get();
+ if ( newInfo != null ) {
+ final Event job = newInfo.event;
+
+ // is this an ordered queue?
+ final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+
+ if ( orderedQueue ) {
+ // we just sleep for the delay time - if none, we continue and retry
+ // this job again
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ info = newInfo;
+ } else {
+ // delay rescheduling?
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final Runnable t = new Runnable() {
+ public void run() {
+ try {
+ jobQueue.put(newInfo);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
+ }
+ };
+ try {
+ this.scheduler.fireJobAt(null, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ } else {
+ // put directly into queue
+ try {
+ jobQueue.put(newInfo);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
}
}
}
@@ -408,7 +473,7 @@
/**
* Process a job
*/
- private void executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
+ private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
// check if the node still exists
synchronized (this.backgroundSession) {
try {
@@ -424,7 +489,7 @@
// if parallel processing is allowed, we can just process
// if not we should check if any other job with the same topic is currently running
boolean process = parallelProcessing;
- if ( !process ) {
+ if ( !parallelProcessing ) {
synchronized ( this.processingMap ) {
final Boolean value = this.processingMap.get(jobTopic);
if ( value == null || !value.booleanValue() ) {
@@ -449,6 +514,7 @@
if ( process ) {
unlock = false;
this.processJob(info.event, eventNode);
+ return true;
}
}
} catch (RepositoryException e) {
@@ -478,11 +544,7 @@
*/
public void run() {
try {
- if ( jobQueue != null ) {
- jobQueue.put(eInfo);
- } else {
- queue.put(eInfo);
- }
+ queue.put(eInfo);
} catch (InterruptedException e) {
// ignore
ignoreException(e);
@@ -517,6 +579,7 @@
}
}
+ return false;
}
/**
@@ -678,7 +741,10 @@
eventNode.save();
final EventAdmin localEA = this.eventAdmin;
if ( localEA != null ) {
- localEA.sendEvent(jobEvent);
+ // we need async delivery, otherwise we might create a deadlock
+ // as this method runs inside a synchronized block and the finishedJob
+ // method as well!
+ localEA.postEvent(jobEvent);
// do not unlock if sending was successful
unlock = false;
} else {
@@ -862,6 +928,8 @@
}
/**
+ * This is a notification from the component which processed the job.
+ *
* @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean)
*/
public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) {
@@ -880,7 +948,7 @@
if ( retries != -1 && retryCount > retries ) {
reschedule = false;
}
- if ( !reschedule ) {
+ if ( reschedule ) {
// update event with retry count and retries
final Dictionary<String, Object> newProperties;
// create a new dictionary
@@ -917,7 +985,7 @@
unlock = false;
final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
if ( jobId == null ) {
- // remove node from repository if no job id set
+ // remove node from repository if no job is set
final Node parentNode = eventNode.getParent();
eventNode.remove();
parentNode.save();
@@ -967,37 +1035,62 @@
// this should never happen
this.ignoreException(e);
}
- // delay rescheduling?
- if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
- final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
- final Date fireDate = new Date();
- fireDate.setTime(System.currentTimeMillis() + delay);
+ // if this is an own job queue, we simply signal the queue to continue
+ // it will pick up the event and either reschedule or wait
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ // we know the queue exists
+ final JobBlockingQueue jobQueue;
+ synchronized ( this.jobQueues ) {
+ jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
+ }
+ jobQueue.set(info);
+ jobQueue.notify();
+ } else {
- final Runnable t = new Runnable() {
- public void run() {
- try {
- queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- ignoreException(e);
+ // delay rescheduling?
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final Runnable t = new Runnable() {
+ public void run() {
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
}
+ };
+ try {
+ this.scheduler.fireJobAt(null, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ } else {
+ // put directly into queue
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
}
- };
- try {
- this.scheduler.fireJobAt(null, t, null, fireDate);
- } catch (Exception e) {
- // we ignore the exception and just put back the job in the queue
- ignoreException(e);
- t.run();
}
- } else {
- // put directly into queue
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
+ }
+ } else {
+ // if this is an own job queue, we simply signal the queue to continue
+ // it will pick up the event continue with the next event
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ // we know the queue exists
+ final JobBlockingQueue jobQueue;
+ synchronized ( this.jobQueues ) {
+ jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
}
+ jobQueue.set(null);
+ jobQueue.notify();
}
}
if ( !shouldReschedule ) {
@@ -1036,15 +1129,16 @@
buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
buffer.append(")");
if ( topic != null ) {
- buffer.append(" @");
+ buffer.append(" and @");
buffer.append(EventHelper.NODE_PROPERTY_TOPIC);
buffer.append(" = '");
buffer.append(topic);
buffer.append("'");
}
if ( locked ) {
- buffer.append(" and ");
- buffer.append("@jcr:lockOwner");
+ buffer.append(" and @jcr:lockOwner");
+ } else {
+ buffer.append(" and not(@jcr:lockOwner)");
}
if ( filterProps != null ) {
final Iterator<Map.Entry<String, Object>> i = filterProps.entrySet().iterator();
@@ -1126,4 +1220,19 @@
public Collection<Event> getScheduledJobs(String topic, Map<String, Object> filterProps) {
return this.queryCurrentJobs(topic, null, false);
}
+
+ private static final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> {
+
+ private EventInfo eventInfo;
+
+ public void set(EventInfo i) {
+ this.eventInfo = i;
+ }
+
+ public EventInfo get() {
+ final EventInfo object = this.eventInfo;
+ this.eventInfo = null;
+ return object;
+ }
+ }
}