You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/20 14:59:41 UTC

svn commit: r1533882 - in /sling/trunk/bundles/extensions/event/src/main: java/org/apache/sling/event/impl/ java/org/apache/sling/event/impl/jobs/ java/org/apache/sling/event/impl/jobs/config/ java/org/apache/sling/event/impl/jobs/queues/ java/org/apac...

Author: cziegeler
Date: Sun Oct 20 12:59:41 2013
New Revision: 1533882

URL: http://svn.apache.org/r1533882
Log:
SLING-3188 : Thread pool configuration per queue

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Sun Oct 20 12:59:41 2013
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
@@ -54,13 +55,31 @@ public class EventingThreadPool implemen
     @Property(intValue=DEFAULT_POOL_SIZE)
     private static final String PROPERTY_POOL_SIZE = "minPoolSize";
 
+    public EventingThreadPool() {
+        // default constructor
+    }
+
+    public EventingThreadPool(final ThreadPoolManager tpm, final int poolSize) {
+        this.threadPoolManager = tpm;
+    }
+
+    public void release() {
+        this.deactivate();
+    }
+
     /**
      * Activate this component.
      */
     @Activate
+    @Modified
     protected void activate(final Map<String, Object> props) {
+        final int maxPoolSize = PropertiesUtil.toInteger(props.get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE);
+        this.configure(maxPoolSize);
+    }
+
+    private void configure(final int maxPoolSize) {
         final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig();
-        config.setMinPoolSize(PropertiesUtil.toInteger(props.get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE));
+        config.setMinPoolSize(maxPoolSize);
         config.setMaxPoolSize(config.getMinPoolSize());
         config.setQueueSize(-1); // unlimited
         config.setShutdownGraceful(true);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Sun Oct 20 12:59:41 2013
@@ -50,6 +50,7 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.discovery.TopologyEvent;
 import org.apache.sling.discovery.TopologyEvent.Type;
 import org.apache.sling.discovery.TopologyEventListener;
@@ -140,6 +141,9 @@ public class JobManagerImpl
     @Reference
     private QueuesMBean queuesMBean;
 
+    @Reference
+    private ThreadPoolManager threadPoolManager;
+
     /** The job manager configuration. */
     private JobManagerConfiguration configuration;
 
@@ -334,11 +338,11 @@ public class JobManagerImpl
                     }
                     if ( queue == null ) {
                         if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
-                            queue = new OrderedJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin);
+                            queue = new OrderedJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin);
                         } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
-                            queue = new ParallelJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin, this.scheduler);
+                            queue = new ParallelJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler);
                         } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
-                            queue = new TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin, this.scheduler);
+                            queue = new TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler);
                         }
                         if ( queue == null ) {
                             // this is just a sanity check, actually we can never get here

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Sun Oct 20 12:59:41 2013
@@ -31,6 +31,7 @@ public abstract class ConfigurationConst
     public static final long DEFAULT_RETRY_DELAY = 2000;
     public static final int DEFAULT_MAX_PARALLEL = 15;
     public static final boolean DEFAULT_KEEP_JOBS = false;
+    public static final int DEFAULT_THREAD_POOL_SIZE = 0;
 
     public static final String PROP_NAME = "queue.name";
     public static final String PROP_TYPE = "queue.type";
@@ -40,4 +41,6 @@ public abstract class ConfigurationConst
     public static final String PROP_RETRY_DELAY = "queue.retrydelay";
     public static final String PROP_PRIORITY = "queue.priority";
     public static final String PROP_KEEP_JOBS = "queue.keepJobs";
+    public static final String PROP_THREAD_POOL_SIZE = "queue.threadPoolSize";
+
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Sun Oct 20 12:59:41 2013
@@ -64,6 +64,8 @@ import org.osgi.framework.Constants;
                      @PropertyOption(name="MAX",value="Max")}),
     @Property(name=ConfigurationConstants.PROP_KEEP_JOBS,
               boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS),
+    @Property(name=ConfigurationConstants.PROP_THREAD_POOL_SIZE,
+              intValue=ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE),
     @Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
               label="%queue.ranking.name", description="%queue.ranking.description")
 })
@@ -103,6 +105,9 @@ public class InternalQueueConfiguration
     /** Valid flag. */
     private boolean valid = false;
 
+    /** Optional thread pool size. */
+    private int ownThreadPoolSize;
+
     private String pid;
 
     /**
@@ -139,6 +144,7 @@ public class InternalQueueConfiguration
         }
         this.keepJobs = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_KEEP_JOBS), ConfigurationConstants.DEFAULT_KEEP_JOBS);
         this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
+        this.ownThreadPoolSize = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_THREAD_POOL_SIZE), ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE);
         this.pid = (String)params.get(Constants.SERVICE_PID);
         this.valid = this.checkIsValid();
     }
@@ -282,6 +288,11 @@ public class InternalQueueConfiguration
     }
 
     @Override
+    public int getOwnThreadPoolSize() {
+        return this.ownThreadPoolSize;
+    }
+
+    @Override
     public String toString() {
         return "Queue-Configuration(" + this.hashCode() + ") : {" +
             "name=" + this.name +
@@ -291,6 +302,7 @@ public class InternalQueueConfiguration
             ", retries=" + this.retries +
             ", retryDelayInMs=" + this.retryDelay +
             ", keepJobs=" + this.keepJobs +
+            ", ownThreadPoolSize=" + this.ownThreadPoolSize +
             ", serviceRanking=" + this.serviceRanking +
             ", pid=" + this.pid +
             ", isValid=" + this.isValid() + "}";

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Sun Oct 20 12:59:41 2013
@@ -30,7 +30,9 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EventingThreadPool;
 import org.apache.sling.event.impl.jobs.InternalJobState;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
@@ -110,8 +112,11 @@ public abstract class AbstractJobQueue
     /** Marker flag if the queue is waiting for another element (= empty) */
     protected boolean isWaitingForNext = false;
 
+    /** A marker for closing the queue. */
     private final AtomicBoolean closeMarker = new AtomicBoolean(false);
 
+    private final ThreadPool threadPool;
+
     /**
      * Start this queue
      * @param name The queue name
@@ -121,7 +126,13 @@ public abstract class AbstractJobQueue
     public AbstractJobQueue(final String name,
                     final InternalQueueConfiguration config,
                     final JobConsumerManager jobConsumerManager,
+                    final ThreadPoolManager threadPoolManager,
                     final EventAdmin eventAdmin) {
+        if ( config.getOwnThreadPoolSize() > 0 ) {
+            this.threadPool = new EventingThreadPool(threadPoolManager, config.getOwnThreadPoolSize());
+        } else {
+            this.threadPool = Environment.THREAD_POOL;
+        }
         this.queueName = name;
         this.configuration = config;
         this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
@@ -196,6 +207,9 @@ public abstract class AbstractJobQueue
         synchronized ( this.startedJobsLists ) {
             this.startedJobsLists.clear();
         }
+        if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
+            ((EventingThreadPool)this.threadPool).release();
+        }
         this.logger.info("Stopped job queue {}", this.queueName);
     }
 
@@ -690,7 +704,7 @@ public abstract class AbstractJobQueue
 
                         };
                         // check if the thread pool is available
-                        final ThreadPool pool = Environment.THREAD_POOL;
+                        final ThreadPool pool = this.threadPool;
                         if ( pool != null ) {
                             pool.execute(task);
                         } else {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Sun Oct 20 12:59:41 2013
@@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs
 import java.util.Date;
 
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -41,9 +42,10 @@ public abstract class AbstractParallelJo
     public AbstractParallelJobQueue(final String name,
                            final InternalQueueConfiguration config,
                            final JobConsumerManager jobConsumerManager,
+                           final ThreadPoolManager threadPoolManager,
                            final EventAdmin eventAdmin,
                            final Scheduler scheduler) {
-        super(name, config, jobConsumerManager, eventAdmin);
+        super(name, config, jobConsumerManager, threadPoolManager, eventAdmin);
         this.scheduler = scheduler;
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Sun Oct 20 12:59:41 2013
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -68,8 +69,9 @@ public final class OrderedJobQueue exten
     public OrderedJobQueue(final String name,
                            final InternalQueueConfiguration config,
                            final JobConsumerManager jobConsumerManager,
+                           final ThreadPoolManager threadPoolManager,
                            final EventAdmin eventAdmin) {
-        super(name, config, jobConsumerManager, eventAdmin);
+        super(name, config, jobConsumerManager, threadPoolManager, eventAdmin);
     }
 
     @Override

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Sun Oct 20 12:59:41 2013
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -42,9 +43,10 @@ public final class ParallelJobQueue exte
     public ParallelJobQueue(final String name,
                            final InternalQueueConfiguration config,
                            final JobConsumerManager jobConsumerManager,
+                           final ThreadPoolManager threadPoolManager,
                            final EventAdmin eventAdmin,
                            final Scheduler scheduler) {
-        super(name, config, jobConsumerManager, eventAdmin, scheduler);
+        super(name, config, jobConsumerManager, threadPoolManager, eventAdmin, scheduler);
     }
 
     @Override

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Sun Oct 20 12:59:41 2013
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -53,9 +54,10 @@ public final class TopicRoundRobinJobQue
     public TopicRoundRobinJobQueue(final String name,
                            final InternalQueueConfiguration config,
                            final JobConsumerManager jobConsumerManager,
+                           final ThreadPoolManager threadPoolManager,
                            final EventAdmin eventAdmin,
                            final Scheduler scheduler) {
-        super(name, config, jobConsumerManager, eventAdmin, scheduler);
+        super(name, config, jobConsumerManager, threadPoolManager, eventAdmin, scheduler);
     }
 
     @Override

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Sun Oct 20 12:59:41 2013
@@ -104,6 +104,14 @@ public interface QueueConfiguration {
     boolean isKeepJobs();
 
     /**
+     * Return the size for the optional thread pool for this queue.
+     * @return A positive number or <code>0</code> if the default thread pool
+     *         should be used.
+     * @since 1.3
+     */
+    int getOwnThreadPoolSize();
+
+    /**
      * Get the ranking of this configuration.
      */
     int getRanking();

Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Sun Oct 20 12:59:41 2013
@@ -69,6 +69,11 @@ queue.ranking.name = Ranking
 queue.ranking.description = Integer value defining the ranking of this queue configuration. \
  If more than one queue matches a job topic, the one with the highest ranking is used.
 
+queue.threadPoolSize.name = Thread Pool Size
+queue.threadPoolSize.description = Optional configuration value for a thread pool to be used by \
+ this queue. If this is value has a positive number of threads configuration, this queue uses \
+ an own thread pool with the configured number of threads.
+
 #
 # Job Event Handler
 job.events.name = Apache Sling Job Default Queue