You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/11 11:30:20 UTC
svn commit: r908911 - in /sling/trunk/bundles/extensions/event/src/main:
java/org/apache/sling/event/impl/JobEventHandler.java
resources/OSGI-INF/metatype/metatype.properties
Author: cziegeler
Date: Thu Feb 11 10:30:14 2010
New Revision: 908911
URL: http://svn.apache.org/viewvc?rev=908911&view=rev
Log:
SLING-1369 : Make the maximum number of job queues configurable
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Thu Feb 11 10:30:14 2010
@@ -120,12 +120,18 @@
/** Default nubmer of parallel jobs. */
private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
+ /** Default nubmer of job queues. */
+ private static final int DEFAULT_MAXIMUM_JOB_QUEUES = 10;
+
@Property(longValue=DEFAULT_MAXIMUM_PARALLEL_JOBS)
private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = "max.parallel.jobs";
@Property(longValue=DEFAULT_WAIT_FOR_ACK)
private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
+ @Property(intValue=DEFAULT_MAXIMUM_JOB_QUEUES)
+ private static final String CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES = "max.job.queues";
+
/** We check every 30 secs by default. */
private long sleepTime;
@@ -180,6 +186,9 @@
/** Number of jobs to load from the repository on startup in one go. */
private long maxLoadJobs;
+ /** Number of allowed job queues */
+ private int maxJobQueues;
+
/** Default maximum load jobs. */
private static final long DEFAULT_MAXIMUM_LOAD_JOBS = 1000;
@@ -234,6 +243,7 @@
this.loadThreshold = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_LOAD_THREASHOLD), DEFAULT_LOAD_THRESHOLD);
this.backgroundLoadDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
this.backgroundCheckDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY), DEFAULT_BACKGROUND_CHECK_DELAY);
+ this.maxJobQueues = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES), DEFAULT_MAXIMUM_JOB_QUEUES);
this.componentContext = context;
super.activate(context);
JOB_THREAD_POOL = this.threadPool;
@@ -555,8 +565,9 @@
// load unprocessed jobs from repository
if ( this.running ) {
logger.info("Apache Sling Job Event Handler started.");
- logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}, waitForAck={} ms, maximumParallelJobs={}, cleanupPeriod={} min)",
- new Object[] {sleepTime, maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod});
+ logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}," +
+ " waitForAck={} ms, maximumParallelJobs={}, cleanupPeriod={} min, maxJobQueues={})",
+ new Object[] {sleepTime, maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod,maxJobQueues});
} else {
final ComponentContext ctx = this.componentContext;
// deactivate
@@ -596,43 +607,51 @@
if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
synchronized ( this.jobQueues ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
- }
BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
if ( jobQueue == null ) {
- final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
- final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
- jobQueue = jq;
- this.jobQueues.put(queueName, jq);
- // Start background thread
- this.threadPool.execute(new Runnable() {
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run() {
- while ( running && !jq.isFinished() ) {
- logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
- try {
- runJobQueue(queueName, jq);
- } catch (Throwable t) {
- logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+ // check if we have exceeded the maximum number of job queues
+ if ( this.jobQueues.size() >= this.maxJobQueues ) {
+ this.logger.warn("Unable to create new job queue named {} as there are already {} job queues." +
+ " Try to increase the maximum number of job queues!", queueName, this.jobQueues.size());
+ } else {
+ final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+ final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
+ jobQueue = jq;
+ this.jobQueues.put(queueName, jq);
+ // Start background thread
+ this.threadPool.execute(new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ while ( running && !jq.isFinished() ) {
+ logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
+ try {
+ runJobQueue(queueName, jq);
+ } catch (Throwable t) {
+ logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+ }
}
}
- }
- });
+ });
+ }
}
- try {
- jobQueue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
+ if ( jobQueue != null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
+ }
+ try {
+ jobQueue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ // don't process this here
+ info = null;
}
}
- // don't process this here
- info = null;
}
// if we still have a job, process it
Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Thu Feb 11 10:30:14 2010
@@ -90,6 +90,10 @@
load.checkdelay.description = The background loader sleeps this time of seconds before \
checking the repository for jobs. Default value is 240 seconds.
+max.job.queues.name = Max Job Queues
+max.job.queues.description = The maximum number of job queues (default is 10). \
+ If this number is exceeded all jobs for a new job queue are put into the main queue.
+
#
# Event Pool
event.pool.name = Apache Sling Event Thread Pool