You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/25 11:02:45 UTC
svn commit: r1475680 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
JobHandler.java queues/AbstractJobQueue.java queues/OrderedJobQueue.java
Author: cziegeler
Date: Thu Apr 25 09:02:44 2013
New Revision: 1475680
URL: http://svn.apache.org/r1475680
Log:
SLING-2829 : Add API for starting a job and service interface for executing a job - first shot at an async processing api
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1475680&r1=1475679&r2=1475680&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Thu Apr 25 09:02:44 2013
@@ -75,6 +75,6 @@ public class JobHandler {
@Override
public String toString() {
- return "JobEvent(" + this.job.getId() + ")";
+ return "JobHandler(" + this.job.getId() + ")";
}
}
\ No newline at end of file
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1475680&r1=1475679&r2=1475680&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Apr 25 09:02:44 2013
@@ -239,11 +239,7 @@ public abstract class AbstractJobQueue
*/
@Override
public boolean sendAcknowledge(final Event job) {
- final String location = (String)job.getProperty(JobUtil.JOB_ID);
- return this.sendAcknowledge(location);
- }
-
- private boolean sendAcknowledge(final String jobId) {
+ final String jobId = (String)job.getProperty(JobUtil.JOB_ID);
final JobHandler ack;
synchronized ( this.startedJobsLists ) {
ack = this.startedJobsLists.remove(jobId);
@@ -397,6 +393,7 @@ public abstract class AbstractJobQueue
protected boolean canBeMarkedForRemoval() {
return this.isEmpty() && !this.isWaiting &&!this.isSuspended();
}
+
/**
* Mark this queue for removal.
*/
@@ -488,24 +485,15 @@ public abstract class AbstractJobQueue
}
try {
handler.started = System.currentTimeMillis();
- // let's add the event to our processing list
- synchronized ( this.startedJobsLists ) {
- this.startedJobsLists.put(job.getId(), handler);
- }
if ( consumer != null ) {
- // first check for a notifier context to send an acknowledge
- boolean notify = true;
- if ( !this.sendAcknowledge(job.getId()) ) {
- // if we don't get an ack, someone else is already processing this job.
- // we process but do not notify the job event handler.
- logger.info("Someone else is already processing job {}.", Utility.toString(job));
- notify = false;
+ final long queueTime = handler.started - handler.queued;
+ this.addActive(queueTime);
+ Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_STARTED, job, queueTime);
+ synchronized ( this.processsingJobsLists ) {
+ this.processsingJobsLists.put(job.getId(), handler);
}
- final JobUtil.JobPriority priority = job.getJobPriority();
- final boolean notifyResult = notify;
-
final Runnable task = new Runnable() {
/**
@@ -519,8 +507,8 @@ public abstract class AbstractJobQueue
final int oldPriority = currentThread.getPriority();
currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
- if ( priority != null ) {
- switch ( priority ) {
+ if ( job.getJobPriority() != null ) {
+ switch ( job.getJobPriority() ) {
case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
break;
case MIN : currentThread.setPriority(Thread.MIN_PRIORITY);
@@ -573,7 +561,7 @@ public abstract class AbstractJobQueue
job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, null);
currentThread.setPriority(oldPriority);
currentThread.setName(oldName);
- if ( notifyResult && result != JobConsumer.JobResult.ASYNC ) {
+ if ( result != JobConsumer.JobResult.ASYNC ) {
finishedJob(job.getId(), result, false);
}
}
@@ -607,6 +595,10 @@ public abstract class AbstractJobQueue
}
} else {
+ // let's add the event to our processing list
+ synchronized ( this.startedJobsLists ) {
+ this.startedJobsLists.put(job.getId(), handler);
+ }
final Event jobEvent = this.getJobEvent(handler);
// we need async delivery, otherwise we might create a deadlock
// as this method runs inside a synchronized block and the finishedJob
@@ -674,11 +666,6 @@ public abstract class AbstractJobQueue
}
/**
- * Reschedule a job.
- */
- protected abstract JobHandler reschedule(final JobHandler info);
-
- /**
* @see org.apache.sling.event.jobs.Queue#getStatistics()
*/
@Override
@@ -771,6 +758,11 @@ public abstract class AbstractJobQueue
}
/**
+ * Reschedule a job.
+ */
+ protected abstract JobHandler reschedule(final JobHandler info);
+
+ /**
* Put another job into the queue.
*/
protected abstract void put(final JobHandler event);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1475680&r1=1475679&r2=1475680&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Thu Apr 25 09:02:44 2013
@@ -20,9 +20,11 @@ package org.apache.sling.event.impl.jobs
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
@@ -38,14 +40,20 @@ import org.osgi.service.event.EventAdmin
*/
public final class OrderedJobQueue extends AbstractJobQueue {
- /** The job event for rescheduling. */
- private volatile JobHandler jobEvent;
+ /** The job handler for rescheduling. */
+ private volatile JobHandler jobHandler;
/** Lock and status object for handling the sleep phase. */
private final SleepLock sleepLock = new SleepLock();
- /** The queue. */
- private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>();
+ /** The queue - we use a set which is sorted by job creation date. */
+ private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>() {
+
+ @Override
+ public int compare(final JobHandler o1, final JobHandler o2) {
+ return o1.getJob().getCreated().compareTo(o2.getJob().getCreated());
+ }
+ });
private final Object syncLock = new Object();
@@ -62,12 +70,12 @@ public final class OrderedJobQueue exten
}
@Override
- protected JobHandler start(final JobHandler processInfo) {
- JobHandler rescheduleInfo = null;
+ protected JobHandler start(final JobHandler handler) {
+ JobHandler rescheduleHandler = null;
// if we are ordered we simply wait for the finish
synchronized ( this.syncLock ) {
- if ( this.executeJob(processInfo) ) {
+ if ( this.executeJob(handler) ) {
this.isWaiting = true;
this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
while ( this.isWaiting ) {
@@ -78,18 +86,18 @@ public final class OrderedJobQueue exten
}
}
this.logger.debug("Job queue {} is continuing.", this.queueName);
- rescheduleInfo = this.jobEvent;
- this.jobEvent = null;
+ rescheduleHandler = this.jobHandler;
+ this.jobHandler = null;
}
}
- return rescheduleInfo;
+ return rescheduleHandler;
}
private void wakeUp(final boolean discardJob) {
synchronized ( this.sleepLock ) {
if ( this.sleepLock.sleepingSince != -1 ) {
if ( discardJob ) {
- this.sleepLock.jobEvent = null;
+ this.sleepLock.jobHandler = null;
}
this.sleepLock.notify();
}
@@ -103,34 +111,41 @@ public final class OrderedJobQueue exten
}
@Override
- protected void put(final JobHandler event) {
- try {
- this.queue.put(event);
- } catch (final InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
+ protected void put(final JobHandler handler) {
+ synchronized ( this.queue ) {
+ this.queue.add(handler);
+ this.queue.notify();
}
}
@Override
protected JobHandler take() {
- try {
- return this.queue.take();
- } catch (final InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
+ synchronized ( this.queue ) {
+ while ( this.queue.isEmpty() ) {
+ try {
+ this.queue.wait();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ // get the first element and remove it
+ final Iterator<JobHandler> i = this.queue.iterator();
+ final JobHandler result = i.next();
+ i.remove();
+ return result;
}
- return null;
}
@Override
protected boolean isEmpty() {
- return this.queue.isEmpty();
+ synchronized ( this.queue ) {
+ return this.queue.isEmpty();
+ }
}
@Override
- protected void notifyFinished(final JobHandler rescheduleInfo) {
- this.jobEvent = rescheduleInfo;
+ protected void notifyFinished(final JobHandler rescheduleHandler) {
+ this.jobHandler = rescheduleHandler;
this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
synchronized ( this.syncLock ) {
this.isWaiting = false;
@@ -139,17 +154,17 @@ public final class OrderedJobQueue exten
}
@Override
- protected JobHandler reschedule(final JobHandler info) {
+ protected JobHandler reschedule(final JobHandler handler) {
// we just sleep for the delay time - if none, we continue and retry
// this job again
long delay = this.configuration.getRetryDelayInMs();
- if ( info.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY) != null ) {
- delay = info.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY, Long.class);
+ if ( handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ delay = handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY, Long.class);
}
if ( delay > 0 ) {
synchronized ( this.sleepLock ) {
this.sleepLock.sleepingSince = System.currentTimeMillis();
- this.sleepLock.jobEvent = info;
+ this.sleepLock.jobHandler = handler;
this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
try {
this.sleepLock.wait(delay);
@@ -157,16 +172,16 @@ public final class OrderedJobQueue exten
this.ignoreException(e);
}
this.sleepLock.sleepingSince = -1;
- final JobHandler result = this.sleepLock.jobEvent;
- this.sleepLock.jobEvent = null;
+ final JobHandler result = this.sleepLock.jobHandler;
+ this.sleepLock.jobHandler = null;
if ( result == null ) {
- info.remove();
+ handler.remove();
}
return result;
}
}
- return info;
+ return handler;
}
/**
@@ -174,7 +189,9 @@ public final class OrderedJobQueue exten
*/
@Override
public void clear() {
- this.queue.clear();
+ synchronized ( this.queue ) {
+ this.queue.clear();
+ }
super.clear();
}
@@ -182,14 +199,17 @@ public final class OrderedJobQueue exten
public synchronized void removeAll() {
// remove all remaining jobs first
super.removeAll();
- this.jobEvent = null;
+ this.jobHandler = null;
this.wakeUp(true);
}
@Override
protected Collection<JobHandler> removeAllJobs() {
final List<JobHandler> events = new ArrayList<JobHandler>();
- this.queue.drainTo(events);
+ synchronized ( this.queue ) {
+ events.addAll(this.queue);
+ this.queue.clear();
+ }
return events;
}
@@ -207,7 +227,7 @@ public final class OrderedJobQueue exten
public volatile long sleepingSince = -1;
/** The job event to be returned after sleeping. */
- public volatile JobHandler jobEvent;
+ public volatile JobHandler jobHandler;
}
}