You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/20 14:59:41 UTC
svn commit: r1533882 - in /sling/trunk/bundles/extensions/event/src/main:
java/org/apache/sling/event/impl/ java/org/apache/sling/event/impl/jobs/
java/org/apache/sling/event/impl/jobs/config/
java/org/apache/sling/event/impl/jobs/queues/ java/org/apac...
Author: cziegeler
Date: Sun Oct 20 12:59:41 2013
New Revision: 1533882
URL: http://svn.apache.org/r1533882
Log:
SLING-3188 : Thread pool configuration per queue
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Sun Oct 20 12:59:41 2013
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
@@ -54,13 +55,31 @@ public class EventingThreadPool implemen
@Property(intValue=DEFAULT_POOL_SIZE)
private static final String PROPERTY_POOL_SIZE = "minPoolSize";
+ public EventingThreadPool() {
+ // default constructor
+ }
+
+ public EventingThreadPool(final ThreadPoolManager tpm, final int poolSize) {
+ this.threadPoolManager = tpm;
+ }
+
+ public void release() {
+ this.deactivate();
+ }
+
/**
* Activate this component.
*/
@Activate
+ @Modified
protected void activate(final Map<String, Object> props) {
+ final int maxPoolSize = PropertiesUtil.toInteger(props.get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE);
+ this.configure(maxPoolSize);
+ }
+
+ private void configure(final int maxPoolSize) {
final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig();
- config.setMinPoolSize(PropertiesUtil.toInteger(props.get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE));
+ config.setMinPoolSize(maxPoolSize);
config.setMaxPoolSize(config.getMinPoolSize());
config.setQueueSize(-1); // unlimited
config.setShutdownGraceful(true);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Sun Oct 20 12:59:41 2013
@@ -50,6 +50,7 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
@@ -140,6 +141,9 @@ public class JobManagerImpl
@Reference
private QueuesMBean queuesMBean;
+ @Reference
+ private ThreadPoolManager threadPoolManager;
+
/** The job manager configuration. */
private JobManagerConfiguration configuration;
@@ -334,11 +338,11 @@ public class JobManagerImpl
}
if ( queue == null ) {
if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
- queue = new OrderedJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin);
+ queue = new OrderedJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin);
} else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
- queue = new ParallelJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin, this.scheduler);
+ queue = new ParallelJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler);
} else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
- queue = new TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin, this.scheduler);
+ queue = new TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler);
}
if ( queue == null ) {
// this is just a sanity check, actually we can never get here
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Sun Oct 20 12:59:41 2013
@@ -31,6 +31,7 @@ public abstract class ConfigurationConst
public static final long DEFAULT_RETRY_DELAY = 2000;
public static final int DEFAULT_MAX_PARALLEL = 15;
public static final boolean DEFAULT_KEEP_JOBS = false;
+ public static final int DEFAULT_THREAD_POOL_SIZE = 0;
public static final String PROP_NAME = "queue.name";
public static final String PROP_TYPE = "queue.type";
@@ -40,4 +41,6 @@ public abstract class ConfigurationConst
public static final String PROP_RETRY_DELAY = "queue.retrydelay";
public static final String PROP_PRIORITY = "queue.priority";
public static final String PROP_KEEP_JOBS = "queue.keepJobs";
+ public static final String PROP_THREAD_POOL_SIZE = "queue.threadPoolSize";
+
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Sun Oct 20 12:59:41 2013
@@ -64,6 +64,8 @@ import org.osgi.framework.Constants;
@PropertyOption(name="MAX",value="Max")}),
@Property(name=ConfigurationConstants.PROP_KEEP_JOBS,
boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS),
+ @Property(name=ConfigurationConstants.PROP_THREAD_POOL_SIZE,
+ intValue=ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE),
@Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
label="%queue.ranking.name", description="%queue.ranking.description")
})
@@ -103,6 +105,9 @@ public class InternalQueueConfiguration
/** Valid flag. */
private boolean valid = false;
+ /** Optional thread pool size. */
+ private int ownThreadPoolSize;
+
private String pid;
/**
@@ -139,6 +144,7 @@ public class InternalQueueConfiguration
}
this.keepJobs = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_KEEP_JOBS), ConfigurationConstants.DEFAULT_KEEP_JOBS);
this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
+ this.ownThreadPoolSize = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_THREAD_POOL_SIZE), ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE);
this.pid = (String)params.get(Constants.SERVICE_PID);
this.valid = this.checkIsValid();
}
@@ -282,6 +288,11 @@ public class InternalQueueConfiguration
}
@Override
+ public int getOwnThreadPoolSize() {
+ return this.ownThreadPoolSize;
+ }
+
+ @Override
public String toString() {
return "Queue-Configuration(" + this.hashCode() + ") : {" +
"name=" + this.name +
@@ -291,6 +302,7 @@ public class InternalQueueConfiguration
", retries=" + this.retries +
", retryDelayInMs=" + this.retryDelay +
", keepJobs=" + this.keepJobs +
+ ", ownThreadPoolSize=" + this.ownThreadPoolSize +
", serviceRanking=" + this.serviceRanking +
", pid=" + this.pid +
", isValid=" + this.isValid() + "}";
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Sun Oct 20 12:59:41 2013
@@ -30,7 +30,9 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
@@ -110,8 +112,11 @@ public abstract class AbstractJobQueue
/** Marker flag if the queue is waiting for another element (= empty) */
protected boolean isWaitingForNext = false;
+ /** A marker for closing the queue. */
private final AtomicBoolean closeMarker = new AtomicBoolean(false);
+ private final ThreadPool threadPool;
+
/**
* Start this queue
* @param name The queue name
@@ -121,7 +126,13 @@ public abstract class AbstractJobQueue
public AbstractJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
+ final ThreadPoolManager threadPoolManager,
final EventAdmin eventAdmin) {
+ if ( config.getOwnThreadPoolSize() > 0 ) {
+ this.threadPool = new EventingThreadPool(threadPoolManager, config.getOwnThreadPoolSize());
+ } else {
+ this.threadPool = Environment.THREAD_POOL;
+ }
this.queueName = name;
this.configuration = config;
this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
@@ -196,6 +207,9 @@ public abstract class AbstractJobQueue
synchronized ( this.startedJobsLists ) {
this.startedJobsLists.clear();
}
+ if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
+ ((EventingThreadPool)this.threadPool).release();
+ }
this.logger.info("Stopped job queue {}", this.queueName);
}
@@ -690,7 +704,7 @@ public abstract class AbstractJobQueue
};
// check if the thread pool is available
- final ThreadPool pool = Environment.THREAD_POOL;
+ final ThreadPool pool = this.threadPool;
if ( pool != null ) {
pool.execute(task);
} else {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Sun Oct 20 12:59:41 2013
@@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs
import java.util.Date;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -41,9 +42,10 @@ public abstract class AbstractParallelJo
public AbstractParallelJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
+ final ThreadPoolManager threadPoolManager,
final EventAdmin eventAdmin,
final Scheduler scheduler) {
- super(name, config, jobConsumerManager, eventAdmin);
+ super(name, config, jobConsumerManager, threadPoolManager, eventAdmin);
this.scheduler = scheduler;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Sun Oct 20 12:59:41 2013
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -68,8 +69,9 @@ public final class OrderedJobQueue exten
public OrderedJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
+ final ThreadPoolManager threadPoolManager,
final EventAdmin eventAdmin) {
- super(name, config, jobConsumerManager, eventAdmin);
+ super(name, config, jobConsumerManager, threadPoolManager, eventAdmin);
}
@Override
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Sun Oct 20 12:59:41 2013
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -42,9 +43,10 @@ public final class ParallelJobQueue exte
public ParallelJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
+ final ThreadPoolManager threadPoolManager,
final EventAdmin eventAdmin,
final Scheduler scheduler) {
- super(name, config, jobConsumerManager, eventAdmin, scheduler);
+ super(name, config, jobConsumerManager, threadPoolManager, eventAdmin, scheduler);
}
@Override
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Sun Oct 20 12:59:41 2013
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
@@ -53,9 +54,10 @@ public final class TopicRoundRobinJobQue
public TopicRoundRobinJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
+ final ThreadPoolManager threadPoolManager,
final EventAdmin eventAdmin,
final Scheduler scheduler) {
- super(name, config, jobConsumerManager, eventAdmin, scheduler);
+ super(name, config, jobConsumerManager, threadPoolManager, eventAdmin, scheduler);
}
@Override
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Sun Oct 20 12:59:41 2013
@@ -104,6 +104,14 @@ public interface QueueConfiguration {
boolean isKeepJobs();
/**
+ * Return the size for the optional thread pool for this queue.
+ * @return A positive number or <code>0</code> if the default thread pool
+ * should be used.
+ * @since 1.3
+ */
+ int getOwnThreadPoolSize();
+
+ /**
* Get the ranking of this configuration.
*/
int getRanking();
Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1533882&r1=1533881&r2=1533882&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Sun Oct 20 12:59:41 2013
@@ -69,6 +69,11 @@ queue.ranking.name = Ranking
queue.ranking.description = Integer value defining the ranking of this queue configuration. \
If more than one queue matches a job topic, the one with the highest ranking is used.
+queue.threadPoolSize.name = Thread Pool Size
+queue.threadPoolSize.description = Optional configuration value for a thread pool to be used by \
+ this queue. If this is value has a positive number of threads configuration, this queue uses \
+ an own thread pool with the configured number of threads.
+
#
# Job Event Handler
job.events.name = Apache Sling Job Default Queue