You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/03/17 13:25:44 UTC
svn commit: r1667285 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl:
./ jobs/deprecated/ jobs/queues/
Author: cziegeler
Date: Tue Mar 17 12:25:44 2015
New Revision: 1667285
URL: http://svn.apache.org/r1667285
Log:
SLING-4481 : Reduce the number of controller threads for queue
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Tue Mar 17 12:25:44 2015
@@ -67,6 +67,7 @@ public class EventingThreadPool implemen
public EventingThreadPool(final ThreadPoolManager tpm, final int poolSize) {
this.threadPoolManager = tpm;
+ this.configure(poolSize);
}
public void release() {
@@ -91,7 +92,7 @@ public class EventingThreadPool implemen
config.setShutdownGraceful(true);
config.setPriority(ThreadPriority.NORM);
config.setDaemon(true);
- this.threadPool = threadPoolManager.create(config, "Apache Sling Eventing Thread Pool");
+ this.threadPool = threadPoolManager.create(config, "Apache Sling Job Thread Pool");
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java Tue Mar 17 12:25:44 2015
@@ -55,7 +55,9 @@ import org.slf4j.LoggerFactory;
* This handler is enabled by default, to disable it provide a configuration
* which removes both topic properties (or sets them to null)
*
+ * @deprecated
*/
+@Deprecated
@Component(immediate=true)
@Service(value={EventHandler.class, JobConsumer.class})
@Properties({
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java Tue Mar 17 12:25:44 2015
@@ -20,6 +20,10 @@ package org.apache.sling.event.impl.jobs
import org.apache.sling.event.jobs.JobProcessor;
+/**
+ * @deprecated
+ */
+@Deprecated
public interface JobStatusNotifier {
String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java Tue Mar 17 12:25:44 2015
@@ -21,6 +21,10 @@ package org.apache.sling.event.impl.jobs
import org.apache.sling.event.jobs.JobProcessor;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+/**
+ * @deprecated
+ */
+@Deprecated
public class JobStatusNotifierImpl implements JobStatusNotifier {
private volatile boolean isCalled = false;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Tue Mar 17 12:25:44 2015
@@ -113,15 +113,44 @@ public class JobQueueImpl
/** Semaphore for handling the max number of jobs. */
private final Semaphore available;
+ /** Guard for having only one thread executing start jobs. */
+ private final AtomicBoolean startJobsGuard = new AtomicBoolean(false);
+
/**
- * Create a new queue
+ * Create a new queue.
+ *
* @param name The queue name
* @param config The queue configuration
+ * @param services The queue services
+ * @param topics The topics handled by this queue
+ *
+ * @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
*/
- public JobQueueImpl(final String name,
- final InternalQueueConfiguration config,
- final QueueServices services,
- final Set<String> topics) {
+ public static JobQueueImpl createQueue(final String name,
+ final InternalQueueConfiguration config,
+ final QueueServices services,
+ final Set<String> topics) {
+ final QueueJobCache cache = new QueueJobCache(services.configuration, config.getType(), topics);
+ if ( cache.isEmpty() ) {
+ return null;
+ }
+ return new JobQueueImpl(name, config, services, topics, cache);
+ }
+
+ /**
+ * Create a new queue.
+ *
+ * @param name The queue name
+ * @param config The queue configuration
+ * @param services The queue services
+ * @param topics The topics handled by this queue
+ * @param cache The job cache
+ */
+ private JobQueueImpl(final String name,
+ final InternalQueueConfiguration config,
+ final QueueServices services,
+ final Set<String> topics,
+ final QueueJobCache cache) {
if ( config.getOwnThreadPoolSize() > 0 ) {
this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
} else {
@@ -132,8 +161,7 @@ public class JobQueueImpl
this.services = services;
this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
this.running = true;
- this.cache = new QueueJobCache(services.configuration, config.getType(), topics);
- this.cache.fillCache();
+ this.cache = cache;
this.available = new Semaphore(config.getMaxParallel(), true);
logger.info("Starting job queue {}", queueName);
logger.debug("Configuration for job queue={}", configuration);
@@ -166,65 +194,68 @@ public class JobQueueImpl
/**
* Start the job queue.
*/
- public void start() {
- start(false);
+ public void startJobs() {
+ startJobs(false);
}
/**
* Start the job queue.
- * This method might be called concurrently, therefore we synchronize
+ * This method might be called concurrently, therefore we use a guard
*/
- public synchronized void start(boolean justOne) {
- // we start as many jobs in parallel as possible
- while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
- boolean started = false;
- try {
- final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, this.doFullCacheSearch.getAndSet(false));
- if ( handler != null ) {
- started = true;
- this.threadPool.execute(new Runnable() {
-
- @Override
- public void run() {
- // update thread priority and name
- final Thread currentThread = Thread.currentThread();
- final String oldName = currentThread.getName();
- final int oldPriority = currentThread.getPriority();
-
- currentThread.setName(oldName + "-" + handler.getJob().getQueueName() + "(" + handler.getJob().getTopic() + ")");
- if ( configuration.getThreadPriority() != null ) {
- switch ( configuration.getThreadPriority() ) {
- case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
- break;
- case MIN : currentThread.setPriority(Thread.MIN_PRIORITY);
- break;
- case MAX : currentThread.setPriority(Thread.MAX_PRIORITY);
- break;
+ private void startJobs(boolean justOne) {
+ if ( this.startJobsGuard.compareAndSet(false, true) ) {
+ // we start as many jobs in parallel as possible
+ while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
+ boolean started = false;
+ try {
+ final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, this.doFullCacheSearch.getAndSet(false));
+ if ( handler != null ) {
+ started = true;
+ this.threadPool.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ // update thread priority and name
+ final Thread currentThread = Thread.currentThread();
+ final String oldName = currentThread.getName();
+ final int oldPriority = currentThread.getPriority();
+
+ currentThread.setName(oldName + "-" + handler.getJob().getQueueName() + "(" + handler.getJob().getTopic() + ")");
+ if ( configuration.getThreadPriority() != null ) {
+ switch ( configuration.getThreadPriority() ) {
+ case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+ break;
+ case MIN : currentThread.setPriority(Thread.MIN_PRIORITY);
+ break;
+ case MAX : currentThread.setPriority(Thread.MAX_PRIORITY);
+ break;
+ }
}
- }
- try {
- startJob(handler);
- } finally {
- currentThread.setPriority(oldPriority);
- currentThread.setName(oldName);
+ try {
+ startJob(handler);
+ } finally {
+ currentThread.setPriority(oldPriority);
+ currentThread.setName(oldName);
+ }
+ // and try to launch another job
+ startJobs(true);
}
- // and try to launch another job
- start(true);
+ });
+ if ( justOne ) {
+ break;
}
- });
- if ( justOne ) {
+ } else {
+ // no job available, stop look
break;
}
- } else {
- // no job available, stop look
- break;
- }
- } finally {
- if ( !started ) {
- this.available.release();
+ } finally {
+ if ( !started ) {
+ this.available.release();
+ }
}
}
+ this.startJobsGuard.set(false);
}
}
@@ -404,9 +435,8 @@ public class JobQueueImpl
/**
* Check whether this queue can be closed
*/
- public boolean canBeClosed() {
+ private boolean canBeClosed() {
return !this.isSuspended()
- && this.cache.isEmpty()
&& this.asyncCounter.get() == 0
&& this.available.availablePermits() == this.configuration.getMaxParallel();
}
@@ -443,7 +473,7 @@ public class JobQueueImpl
// set full cache search
this.doFullCacheSearch.set(true);
- this.start();
+ this.startJobs();
}
/**
@@ -452,7 +482,7 @@ public class JobQueueImpl
*/
public void wakeUpQueue(final Set<String> topics) {
this.cache.handleNewTopics(topics);
- this.start();
+ this.startJobs();
}
/**
@@ -461,7 +491,7 @@ public class JobQueueImpl
*/
private void requeue(final JobHandler handler) {
this.cache.reschedule(handler);
- this.start();
+ this.startJobs();
}
private static final class RescheduleInfo {
@@ -586,7 +616,7 @@ public class JobQueueImpl
public void resume() {
if ( this.suspendedSince.getAndSet(-1) != -1 ) {
this.logger.debug("Waking up suspended queue {}", queueName);
- this.start();
+ this.startJobs();
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Tue Mar 17 12:25:44 2015
@@ -83,6 +83,7 @@ public class QueueJobCache {
this.queueType = queueType;
this.topics = new ConcurrentSkipListSet<String>(topics);
this.topicsWithNewJobs.addAll(topics);
+ this.fillCache();
}
/**
@@ -111,17 +112,14 @@ public class QueueJobCache {
}
/**
- * Fill the cache
+ * Fill the cache.
+ * No need to sync as this is called from the constructor.
*/
- public void fillCache() {
- synchronized ( this.cache ) {
- if ( this.cache.isEmpty() ) {
- final Set<String> checkingTopics = new HashSet<String>();
- checkingTopics.addAll(this.topics);
- if ( !checkingTopics.isEmpty() ) {
- this.loadJobs(checkingTopics);
- }
- }
+ private void fillCache() {
+ final Set<String> checkingTopics = new HashSet<String>();
+ checkingTopics.addAll(this.topics);
+ if ( !checkingTopics.isEmpty() ) {
+ this.loadJobs(checkingTopics);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Tue Mar 17 12:25:44 2015
@@ -214,9 +214,10 @@ public class QueueManager
* @param topics The topics
*/
private void start(final QueueInfo queueInfo,
- final Set<String> topics) {
+ final Set<String> topics) {
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// get or create queue
+ boolean isNewQueue = false;
JobQueueImpl queue = null;
// we synchronize to avoid creating a queue which is about to be removed during cleanup
synchronized ( queuesLock ) {
@@ -228,13 +229,18 @@ public class QueueManager
queue = null;
}
if ( queue == null ) {
- queue = new JobQueueImpl(queueInfo.queueName, config, queueServices, topics);
- // on startup the queue might be empty and we can simply discard it
- if ( !queue.canBeClosed() ) {
+ queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, topics);
+ // on startup the queue might be empty and we get null back from createQueue
+ if ( queue != null ) {
+ isNewQueue = true;
queues.put(queueInfo.queueName, queue);
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
- queue.start();
}
+ }
+ }
+ if ( queue != null ) {
+ if ( isNewQueue ) {
+ queue.startJobs();
} else {
queue.wakeUpQueue(topics);
}