You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/11 11:30:20 UTC

svn commit: r908911 - in /sling/trunk/bundles/extensions/event/src/main: java/org/apache/sling/event/impl/JobEventHandler.java resources/OSGI-INF/metatype/metatype.properties

Author: cziegeler
Date: Thu Feb 11 10:30:14 2010
New Revision: 908911

URL: http://svn.apache.org/viewvc?rev=908911&view=rev
Log:
SLING-1369 : Make the maximum number of job queues configurable

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Thu Feb 11 10:30:14 2010
@@ -120,12 +120,18 @@
     /** Default nubmer of parallel jobs. */
     private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
 
+    /** Default nubmer of job queues. */
+    private static final int DEFAULT_MAXIMUM_JOB_QUEUES = 10;
+
     @Property(longValue=DEFAULT_MAXIMUM_PARALLEL_JOBS)
     private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = "max.parallel.jobs";
 
     @Property(longValue=DEFAULT_WAIT_FOR_ACK)
     private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
 
+    @Property(intValue=DEFAULT_MAXIMUM_JOB_QUEUES)
+    private static final String CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES = "max.job.queues";
+
     /** We check every 30 secs by default. */
     private long sleepTime;
 
@@ -180,6 +186,9 @@
     /** Number of jobs to load from the repository on startup in one go. */
     private long maxLoadJobs;
 
+    /** Number of allowed job queues */
+    private int maxJobQueues;
+
     /** Default maximum load jobs. */
     private static final long DEFAULT_MAXIMUM_LOAD_JOBS = 1000;
 
@@ -234,6 +243,7 @@
         this.loadThreshold = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_LOAD_THREASHOLD), DEFAULT_LOAD_THRESHOLD);
         this.backgroundLoadDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
         this.backgroundCheckDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY), DEFAULT_BACKGROUND_CHECK_DELAY);
+        this.maxJobQueues = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES), DEFAULT_MAXIMUM_JOB_QUEUES);
         this.componentContext = context;
         super.activate(context);
         JOB_THREAD_POOL = this.threadPool;
@@ -555,8 +565,9 @@
         // load unprocessed jobs from repository
         if ( this.running ) {
             logger.info("Apache Sling Job Event Handler started.");
-            logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}, waitForAck={} ms, maximumParallelJobs={}, cleanupPeriod={} min)",
-                    new Object[] {sleepTime, maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod});
+            logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}," +
+                    " waitForAck={} ms, maximumParallelJobs={}, cleanupPeriod={} min, maxJobQueues={})",
+                    new Object[] {sleepTime, maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod,maxJobQueues});
         } else {
             final ComponentContext ctx = this.componentContext;
             // deactivate
@@ -596,43 +607,51 @@
                 if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
                     final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
                     synchronized ( this.jobQueues ) {
-                        if ( logger.isDebugEnabled() ) {
-                            logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
-                        }
                         BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
                         if ( jobQueue == null ) {
-                            final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
-                            final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
-                            jobQueue = jq;
-                            this.jobQueues.put(queueName, jq);
-                            // Start background thread
-                            this.threadPool.execute(new Runnable() {
-
-                                /**
-                                 * @see java.lang.Runnable#run()
-                                 */
-                                public void run() {
-                                    while ( running && !jq.isFinished() ) {
-                                        logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
-                                        try {
-                                            runJobQueue(queueName, jq);
-                                        } catch (Throwable t) {
-                                            logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+                            // check if we have exceeded the maximum number of job queues
+                            if ( this.jobQueues.size() >= this.maxJobQueues ) {
+                                this.logger.warn("Unable to create new job queue named {} as there are already {} job queues." +
+                                        " Try to increase the maximum number of job queues!", queueName, this.jobQueues.size());
+                            } else {
+                                final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+                                final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
+                                jobQueue = jq;
+                                this.jobQueues.put(queueName, jq);
+                                // Start background thread
+                                this.threadPool.execute(new Runnable() {
+
+                                    /**
+                                     * @see java.lang.Runnable#run()
+                                     */
+                                    public void run() {
+                                        while ( running && !jq.isFinished() ) {
+                                            logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
+                                            try {
+                                                runJobQueue(queueName, jq);
+                                            } catch (Throwable t) {
+                                                logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+                                            }
                                         }
                                     }
-                                }
 
-                            });
+                                });
+                            }
                         }
-                        try {
-                            jobQueue.put(info);
-                        } catch (InterruptedException e) {
-                            // this should never happen
-                            this.ignoreException(e);
+                        if ( jobQueue != null ) {
+                            if ( logger.isDebugEnabled() ) {
+                                logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
+                            }
+                            try {
+                                jobQueue.put(info);
+                            } catch (InterruptedException e) {
+                                // this should never happen
+                                this.ignoreException(e);
+                            }
+                            // don't process this here
+                            info = null;
                         }
                     }
-                    // don't process this here
-                    info = null;
                 }
 
                 // if we still have a job, process it

Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Thu Feb 11 10:30:14 2010
@@ -90,6 +90,10 @@
 load.checkdelay.description = The background loader sleeps this time of seconds before \
  checking the repository for jobs. Default value is 240 seconds.
 
+max.job.queues.name = Max Job Queues
+max.job.queues.description = The maximum number of job queues (default is 10). \
+ If this number is exceeded all jobs for a new job queue are put into the main queue.
+
 #
 # Event Pool
 event.pool.name = Apache Sling Event Thread Pool