You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/08/26 18:07:51 UTC
svn commit: r689124 - in
/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event:
EventUtil.java impl/JobEventHandler.java
Author: cziegeler
Date: Tue Aug 26 09:07:50 2008
New Revision: 689124
URL: http://svn.apache.org/viewvc?rev=689124&view=rev
Log:
SLING-628 : Provide separate queues if property queue name is provided.
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=689124&r1=689123&r2=689124&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Tue Aug 26 09:07:50 2008
@@ -44,6 +44,7 @@
import org.apache.jackrabbit.util.ISO9075;
import org.apache.sling.event.EventUtil.JobStatusNotifier.NotifierContext;
+import org.apache.sling.event.impl.JobEventHandler;
import org.osgi.service.event.Event;
import org.slf4j.LoggerFactory;
@@ -245,12 +246,13 @@
}
};
- final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
- if ( ctx != null ) {
- ctx.notifier.execute(task);
+ // check if the job handler thread pool is available
+ if ( JobEventHandler.JOB_THREAD_POOL != null ) {
+ JobEventHandler.JOB_THREAD_POOL.execute(task);
} else {
- // if we don't have a job status notifier, we create the thread directly
- // (this should never happen but is a safe fallback)
+ // if we don't have a thread pool, we create the thread directly
+ // (this should never happen for jobs, but is a safe fallback and
+ // allows to call this method for other background processing.
new Thread(task).start();
}
}
@@ -284,11 +286,6 @@
* @return <code>true</code> if everything went fine, <code>false</code> otherwise.
*/
boolean finishedJob(Event job, String eventNodePath, boolean reschedule);
-
- /**
- * Execute the job in the background
- */
- void execute(Runnable job);
}
/**
Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=689124&r1=689123&r2=689124&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Tue Aug 26 09:07:50 2008
@@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.ItemExistsException;
import javax.jcr.Node;
@@ -45,9 +47,9 @@
import org.apache.jackrabbit.util.ISO8601;
import org.apache.sling.commons.osgi.OsgiUtil;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
-import org.osgi.framework.BundleEvent;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -73,50 +75,52 @@
extends AbstractRepositoryEventHandler
implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable {
- /** The topic prefix for bundle events. */
- protected static final String BUNDLE_EVENT_PREFIX = BundleEvent.class.getName().replace('.', '/') + '/';
-
/** A map for keeping track of currently processed job topics. */
- protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+ private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+
+ /** A map for the different job queues. */
+ private final Map<String, BlockingQueue<EventInfo>> jobQueues = new HashMap<String, BlockingQueue<EventInfo>>();
/** Default sleep time. */
- protected static final long DEFAULT_SLEEP_TIME = 30;
+ private static final long DEFAULT_SLEEP_TIME = 30;
/** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
- protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
+ private static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
/** Default number of job retries. */
- protected static final int DEFAULT_MAX_JOB_RETRIES = 10;
+ private static final int DEFAULT_MAX_JOB_RETRIES = 10;
/** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
- protected static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
+ private static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
/** We check every 30 secs by default. */
- protected long sleepTime;
+ private long sleepTime;
/** How often should a job be retried by default. */
- protected int maxJobRetries;
+ private int maxJobRetries;
/** Background session. */
- protected Session backgroundSession;
+ private Session backgroundSession;
/** Unloaded jobs. */
- protected Set<String>unloadedJobs = new HashSet<String>();
+ private Set<String>unloadedJobs = new HashSet<String>();
/** List of deleted jobs. */
- protected Set<String>deletedJobs = new HashSet<String>();
+ private Set<String>deletedJobs = new HashSet<String>();
/** Default clean up time is 10 minutes. */
- protected static final int DEFAULT_CLEANUP_PERIOD = 10;
+ private static final int DEFAULT_CLEANUP_PERIOD = 10;
/** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
- protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+ private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
/** We remove everything which is older than 5 min by default. */
- protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+ private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
/** The scheduler for rescheduling jobs. @scr.reference */
- protected Scheduler scheduler;
+ private Scheduler scheduler;
+
+ public static ThreadPool JOB_THREAD_POOL;
/**
* Activate this component.
@@ -131,6 +135,7 @@
this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
super.activate(context);
+ JOB_THREAD_POOL = this.threadPool;
}
/**
@@ -138,6 +143,16 @@
*/
protected void deactivate(final ComponentContext context) {
super.deactivate(context);
+ synchronized ( this.jobQueues ) {
+ final Iterator<BlockingQueue<EventInfo>> i = this.jobQueues.values().iterator();
+ while ( i.hasNext() ) {
+ try {
+ i.next().put(new EventInfo());
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ }
if ( this.backgroundSession != null ) {
try {
this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
@@ -148,6 +163,7 @@
this.backgroundSession.logout();
this.backgroundSession = null;
}
+ JOB_THREAD_POOL = null;
}
/**
@@ -314,123 +330,192 @@
EventInfo info = null;
try {
info = this.queue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+
+ if ( info != null && this.running ) {
// check for local only jobs and remove them from the queue if they're meant
// for another application node
if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
&& !this.applicationId.equals(EventUtil.PROPERTY_APPLICATION) ) {
info = null;
}
+
+ // check if we should put this into a separate queue
+ if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ final String queueName = EventUtil.PROPERTY_JOB_QUEUE_NAME;
+ synchronized ( this.jobQueues ) {
+ BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
+ if ( jobQueue == null ) {
+ jobQueue = new LinkedBlockingQueue<EventInfo>();
+ final BlockingQueue<EventInfo> jq = jobQueue;
+ this.jobQueues.put(queueName, jobQueue);
+ // Start background thread
+ this.threadPool.execute(new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ runJobQueue(queueName, jq);
+ }
+
+ });
+ }
+ try {
+ jobQueue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ // don't process this here
+ info = null;
+ }
+
+ // if we still have a job, process it
+ if ( info != null ) {
+ this.executeJob(info, null);
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute a job queue
+ * @param queueName The name of the job queue
+ * @param jobQueue The job queue
+ */
+ private void runJobQueue(final String queueName, final BlockingQueue<EventInfo> jobQueue) {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ EventInfo info = null;
+ try {
+ info = jobQueue.take();
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
if ( info != null && this.running ) {
+ this.executeJob(info, jobQueue);
+ }
+ }
+ }
- // check if the node still exists
- synchronized (this.backgroundSession) {
- try {
- this.backgroundSession.refresh(false);
- if ( this.backgroundSession.itemExists(info.nodePath)
- && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
- final Event event = info.event;
- final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
-
- // check how we can process this job
- // if parallel processing is allowed, we can just process
- // if not we should check if any other job with the same topic is currently running
- boolean process = parallelProcessing;
- if ( !process ) {
+ /**
+ * Process a job
+ */
+ private void executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
+ // check if the node still exists
+ synchronized (this.backgroundSession) {
+ try {
+ this.backgroundSession.refresh(false);
+ if ( this.backgroundSession.itemExists(info.nodePath)
+ && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
+ final Event event = info.event;
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+ || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+ // check how we can process this job
+ // if parallel processing is allowed, we can just process
+ // if not we should check if any other job with the same topic is currently running
+ boolean process = parallelProcessing;
+ if ( !process ) {
+ synchronized ( this.processingMap ) {
+ final Boolean value = this.processingMap.get(jobTopic);
+ if ( value == null || !value.booleanValue() ) {
+ this.processingMap.put(jobTopic, Boolean.TRUE);
+ process = true;
+ }
+ }
+
+ }
+ if ( process ) {
+ boolean unlock = true;
+ try {
+ final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() ) {
+ // lock node
+ try {
+ eventNode.lock(false, true);
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
+ process = false;
+ }
+ if ( process ) {
+ unlock = false;
+ this.processJob(info.event, eventNode);
+ }
+ }
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ } finally {
+ if ( unlock && !parallelProcessing ) {
synchronized ( this.processingMap ) {
- final Boolean value = this.processingMap.get(jobTopic);
- if ( value == null || !value.booleanValue() ) {
- this.processingMap.put(jobTopic, Boolean.TRUE);
- process = true;
- }
+ this.processingMap.put(jobTopic, Boolean.FALSE);
}
-
}
- if ( process ) {
- boolean unlock = true;
- try {
- final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
- if ( !eventNode.isLocked() ) {
- // lock node
+ }
+ } else {
+ try {
+ // check if the node is in processing or already finished
+ final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ final EventInfo eInfo = info;
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
+
+ // we put it back into the queue after a specific time
+ final Runnable r = new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
try {
- eventNode.lock(false, true);
- } catch (RepositoryException re) {
- // lock failed which means that the node is locked by someone else, so we don't have to requeue
- process = false;
- }
- if ( process ) {
- unlock = false;
- this.processJob(info.event, eventNode);
- }
- }
- } catch (RepositoryException e) {
- // ignore
- this.ignoreException(e);
- } finally {
- if ( unlock && !parallelProcessing ) {
- synchronized ( this.processingMap ) {
- this.processingMap.put(jobTopic, Boolean.FALSE);
- }
- }
- }
- } else {
- try {
- // check if the node is in processing or already finished
- final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
- if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
- final EventInfo eInfo = info;
- final Date fireDate = new Date();
- fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
-
- // we put it back into the queue after a specific time
- final Runnable r = new Runnable() {
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run() {
- try {
- queue.put(eInfo);
- } catch (InterruptedException e) {
- // ignore
- ignoreException(e);
- }
+ if ( jobQueue != null ) {
+ jobQueue.put(eInfo);
+ } else {
+ queue.put(eInfo);
}
-
- };
- try {
- this.scheduler.fireJobAt(null, r, null, fireDate);
- } catch (Exception e) {
- // we ignore the exception
+ } catch (InterruptedException e) {
+ // ignore
ignoreException(e);
- // then wait for the time and readd the job
- try {
- Thread.sleep(sleepTime * 1000);
- } catch (InterruptedException ie) {
- // ignore
- ignoreException(ie);
- }
- r.run();
}
+ }
+ };
+ try {
+ this.scheduler.fireJobAt(null, r, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception
+ ignoreException(e);
+ // then wait for the time and readd the job
+ try {
+ Thread.sleep(sleepTime * 1000);
+ } catch (InterruptedException ie) {
+ // ignore
+ ignoreException(ie);
}
- } catch (RepositoryException e) {
- // ignore
- this.ignoreException(e);
+ r.run();
}
+
}
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
}
- } catch (RepositoryException re) {
- this.ignoreException(re);
}
-
}
+ } catch (RepositoryException re) {
+ this.ignoreException(re);
}
+
}
}
@@ -583,7 +668,8 @@
* @param eventNode The node in the repository where the job is stored.
*/
private void processJob(Event event, Node eventNode) {
- final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+ final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+ || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
boolean unlock = true;
try {
@@ -808,7 +894,8 @@
job = new Event(job.getTopic(), newProperties);
}
}
- final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+ final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+ || job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
// we have to use the same session for unlocking that we used for locking!
synchronized ( this.backgroundSession ) {
try {
@@ -925,13 +1012,6 @@
}
/**
- * @see org.apache.sling.event.EventUtil.JobStatusNotifier#execute(java.lang.Runnable)
- */
- public void execute(Runnable job) {
- this.threadPool.execute(job);
- }
-
- /**
* Search for job nodes
* @param topic The job topic
* @param filterProps optional filter props