You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/03/17 13:25:44 UTC

svn commit: r1667285 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl: ./ jobs/deprecated/ jobs/queues/

Author: cziegeler
Date: Tue Mar 17 12:25:44 2015
New Revision: 1667285

URL: http://svn.apache.org/r1667285
Log:
SLING-4481 : Reduce the number of controller threads for queue

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Tue Mar 17 12:25:44 2015
@@ -67,6 +67,7 @@ public class EventingThreadPool implemen
 
     public EventingThreadPool(final ThreadPoolManager tpm, final int poolSize) {
         this.threadPoolManager = tpm;
+        this.configure(poolSize);
     }
 
     public void release() {
@@ -91,7 +92,7 @@ public class EventingThreadPool implemen
         config.setShutdownGraceful(true);
         config.setPriority(ThreadPriority.NORM);
         config.setDaemon(true);
-        this.threadPool = threadPoolManager.create(config, "Apache Sling Eventing Thread Pool");
+        this.threadPool = threadPoolManager.create(config, "Apache Sling Job Thread Pool");
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java Tue Mar 17 12:25:44 2015
@@ -55,7 +55,9 @@ import org.slf4j.LoggerFactory;
  * This handler is enabled by default, to disable it provide a configuration
  * which removes both topic properties (or sets them to null)
  *
+ * @deprecated
  */
+@Deprecated
 @Component(immediate=true)
 @Service(value={EventHandler.class, JobConsumer.class})
 @Properties({

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java Tue Mar 17 12:25:44 2015
@@ -20,6 +20,10 @@ package org.apache.sling.event.impl.jobs
 
 import org.apache.sling.event.jobs.JobProcessor;
 
+/**
+ * @deprecated
+ */
+@Deprecated
 public interface JobStatusNotifier {
 
     String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java Tue Mar 17 12:25:44 2015
@@ -21,6 +21,10 @@ package org.apache.sling.event.impl.jobs
 import org.apache.sling.event.jobs.JobProcessor;
 import org.apache.sling.event.jobs.consumer.JobExecutionContext;
 
+/**
+ * @deprecated
+ */
+@Deprecated
 public class JobStatusNotifierImpl implements JobStatusNotifier {
 
     private volatile boolean isCalled = false;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Tue Mar 17 12:25:44 2015
@@ -113,15 +113,44 @@ public class JobQueueImpl
     /** Semaphore for handling the max number of jobs. */
     private final Semaphore available;
 
+    /** Guard for having only one thread executing start jobs. */
+    private final AtomicBoolean startJobsGuard = new AtomicBoolean(false);
+
     /**
-     * Create a new queue
+     * Create a new queue.
+     *
      * @param name The queue name
      * @param config The queue configuration
+     * @param services The queue services
+     * @param topics The topics handled by this queue
+     *
+     * @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
      */
-    public JobQueueImpl(final String name,
-                            final InternalQueueConfiguration config,
-                            final QueueServices services,
-                            final Set<String> topics) {
+    public static JobQueueImpl createQueue(final String name,
+                        final InternalQueueConfiguration config,
+                        final QueueServices services,
+                        final Set<String> topics) {
+        final QueueJobCache cache = new QueueJobCache(services.configuration, config.getType(), topics);
+        if ( cache.isEmpty() ) {
+            return null;
+        }
+        return new JobQueueImpl(name, config, services, topics, cache);
+    }
+
+    /**
+     * Create a new queue.
+     *
+     * @param name The queue name
+     * @param config The queue configuration
+     * @param services The queue services
+     * @param topics The topics handled by this queue
+     * @param cache The job cache
+     */
+    private JobQueueImpl(final String name,
+                        final InternalQueueConfiguration config,
+                        final QueueServices services,
+                        final Set<String> topics,
+                        final QueueJobCache cache) {
         if ( config.getOwnThreadPoolSize() > 0 ) {
             this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
         } else {
@@ -132,8 +161,7 @@ public class JobQueueImpl
         this.services = services;
         this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
         this.running = true;
-        this.cache = new QueueJobCache(services.configuration, config.getType(), topics);
-        this.cache.fillCache();
+        this.cache = cache;
         this.available = new Semaphore(config.getMaxParallel(), true);
         logger.info("Starting job queue {}", queueName);
         logger.debug("Configuration for job queue={}", configuration);
@@ -166,65 +194,68 @@ public class JobQueueImpl
     /**
      * Start the job queue.
      */
-    public void start() {
-        start(false);
+    public void startJobs() {
+        startJobs(false);
     }
     /**
      * Start the job queue.
-     * This method might be called concurrently, therefore we synchronize
+     * This method might be called concurrently, therefore we use a guard
      */
-    public synchronized void start(boolean justOne) {
-        // we start as many jobs in parallel as possible
-        while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
-            boolean started = false;
-            try {
-                final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, this.doFullCacheSearch.getAndSet(false));
-                if ( handler != null ) {
-                    started = true;
-                    this.threadPool.execute(new Runnable() {
-
-                        @Override
-                        public void run() {
-                            // update thread priority and name
-                            final Thread currentThread = Thread.currentThread();
-                            final String oldName = currentThread.getName();
-                            final int oldPriority = currentThread.getPriority();
-
-                            currentThread.setName(oldName + "-" + handler.getJob().getQueueName() + "(" + handler.getJob().getTopic() + ")");
-                            if ( configuration.getThreadPriority() != null ) {
-                                switch ( configuration.getThreadPriority() ) {
-                                    case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
-                                                break;
-                                    case MIN  : currentThread.setPriority(Thread.MIN_PRIORITY);
-                                                break;
-                                    case MAX  : currentThread.setPriority(Thread.MAX_PRIORITY);
-                                                break;
+    private void startJobs(boolean justOne) {
+        if ( this.startJobsGuard.compareAndSet(false, true) ) {
+            // we start as many jobs in parallel as possible
+            while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
+                boolean started = false;
+                try {
+                    final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, this.doFullCacheSearch.getAndSet(false));
+                    if ( handler != null ) {
+                        started = true;
+                        this.threadPool.execute(new Runnable() {
+
+                            @Override
+                            public void run() {
+                                // update thread priority and name
+                                final Thread currentThread = Thread.currentThread();
+                                final String oldName = currentThread.getName();
+                                final int oldPriority = currentThread.getPriority();
+
+                                currentThread.setName(oldName + "-" + handler.getJob().getQueueName() + "(" + handler.getJob().getTopic() + ")");
+                                if ( configuration.getThreadPriority() != null ) {
+                                    switch ( configuration.getThreadPriority() ) {
+                                        case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+                                                    break;
+                                        case MIN  : currentThread.setPriority(Thread.MIN_PRIORITY);
+                                                    break;
+                                        case MAX  : currentThread.setPriority(Thread.MAX_PRIORITY);
+                                                    break;
+                                    }
                                 }
-                            }
 
-                            try {
-                                startJob(handler);
-                            } finally {
-                                currentThread.setPriority(oldPriority);
-                                currentThread.setName(oldName);
+                                try {
+                                    startJob(handler);
+                                } finally {
+                                    currentThread.setPriority(oldPriority);
+                                    currentThread.setName(oldName);
+                                }
+                                // and try to launch another job
+                                startJobs(true);
                             }
-                            // and try to launch another job
-                            start(true);
+                        });
+                        if ( justOne ) {
+                            break;
                         }
-                    });
-                    if ( justOne ) {
+                    } else {
+                        // no job available, stop look
                         break;
                     }
-                } else {
-                    // no job available, stop look
-                    break;
-                }
 
-            } finally {
-                if ( !started ) {
-                    this.available.release();
+                } finally {
+                    if ( !started ) {
+                        this.available.release();
+                    }
                 }
             }
+            this.startJobsGuard.set(false);
         }
     }
 
@@ -404,9 +435,8 @@ public class JobQueueImpl
     /**
      * Check whether this queue can be closed
      */
-    public boolean canBeClosed() {
+    private boolean canBeClosed() {
         return !this.isSuspended()
-            && this.cache.isEmpty()
             && this.asyncCounter.get() == 0
             && this.available.availablePermits() == this.configuration.getMaxParallel();
     }
@@ -443,7 +473,7 @@ public class JobQueueImpl
         // set full cache search
         this.doFullCacheSearch.set(true);
 
-        this.start();
+        this.startJobs();
     }
 
     /**
@@ -452,7 +482,7 @@ public class JobQueueImpl
      */
     public void wakeUpQueue(final Set<String> topics) {
         this.cache.handleNewTopics(topics);
-        this.start();
+        this.startJobs();
     }
 
     /**
@@ -461,7 +491,7 @@ public class JobQueueImpl
      */
     private void requeue(final JobHandler handler) {
         this.cache.reschedule(handler);
-        this.start();
+        this.startJobs();
     }
 
     private static final class RescheduleInfo {
@@ -586,7 +616,7 @@ public class JobQueueImpl
     public void resume() {
         if ( this.suspendedSince.getAndSet(-1) != -1 ) {
             this.logger.debug("Waking up suspended queue {}", queueName);
-            this.start();
+            this.startJobs();
         }
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Tue Mar 17 12:25:44 2015
@@ -83,6 +83,7 @@ public class QueueJobCache {
         this.queueType = queueType;
         this.topics = new ConcurrentSkipListSet<String>(topics);
         this.topicsWithNewJobs.addAll(topics);
+        this.fillCache();
     }
 
     /**
@@ -111,17 +112,14 @@ public class QueueJobCache {
     }
 
     /**
-     * Fill the cache
+     * Fill the cache.
+     * No need to sync as this is called from the constructor.
      */
-    public void fillCache() {
-        synchronized ( this.cache ) {
-            if ( this.cache.isEmpty() ) {
-                final Set<String> checkingTopics = new HashSet<String>();
-                checkingTopics.addAll(this.topics);
-                if ( !checkingTopics.isEmpty() ) {
-                    this.loadJobs(checkingTopics);
-                }
-            }
+    private void fillCache() {
+        final Set<String> checkingTopics = new HashSet<String>();
+        checkingTopics.addAll(this.topics);
+        if ( !checkingTopics.isEmpty() ) {
+            this.loadJobs(checkingTopics);
         }
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1667285&r1=1667284&r2=1667285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Tue Mar 17 12:25:44 2015
@@ -214,9 +214,10 @@ public class QueueManager
      * @param topics The topics
      */
     private void start(final QueueInfo queueInfo,
-            final Set<String> topics) {
+                       final Set<String> topics) {
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
         // get or create queue
+        boolean isNewQueue = false;
         JobQueueImpl queue = null;
         // we synchronize to avoid creating a queue which is about to be removed during cleanup
         synchronized ( queuesLock ) {
@@ -228,13 +229,18 @@ public class QueueManager
                 queue = null;
             }
             if ( queue == null ) {
-                queue = new JobQueueImpl(queueInfo.queueName, config, queueServices, topics);
-                // on startup the queue might be empty and we can simply discard it
-                if ( !queue.canBeClosed() ) {
+                queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, topics);
+                // on startup the queue might be empty and we get null back from createQueue
+                if ( queue != null ) {
+                    isNewQueue = true;
                     queues.put(queueInfo.queueName, queue);
                     ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
-                    queue.start();
                 }
+            }
+        }
+        if ( queue != null ) {
+            if ( isNewQueue ) {
+                queue.startJobs();
             } else {
                 queue.wakeUpQueue(topics);
             }