You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/17 17:46:02 UTC

svn commit: r1632617 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: ./ config/ queues/ topics/ topology/

Author: cziegeler
Date: Fri Oct 17 15:46:02 2014
New Revision: 1632617

URL: http://svn.apache.org/r1632617
Log:
SLING-4048 : Avoid keeping jobs in memory. Another refactoring: move job cache to queue (WiP)

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
      - copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java
      - copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
Removed:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Fri Oct 17 15:46:02 2014
@@ -37,7 +37,8 @@ public class JobHandler {
 
     private final JobManagerImpl jobManager;
 
-    public JobHandler(final JobImpl job, final JobManagerImpl jobManager) {
+    public JobHandler(final JobImpl job,
+            final JobManagerImpl jobManager) {
         this.job = job;
         this.jobManager = jobManager;
     }
@@ -48,7 +49,7 @@ public class JobHandler {
 
     public boolean startProcessing(final Queue queue) {
         this.isStopped = false;
-        return this.jobManager.persistJobProperties(this.job, this.job.prepare(queue));
+        return this.persistJobProperties(this.job.prepare(queue));
     }
 
     /**
@@ -62,6 +63,7 @@ public class JobHandler {
 
     /**
      * Reschedule the job
+     * Update the retry count and remove the started time.
      * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
      */
     public boolean reschedule() {
@@ -72,14 +74,18 @@ public class JobHandler {
         this.jobManager.finishJob(this.job, Job.JobState.DROPPED, true, -1);
     }
 
+    /**
+     * Reassign to a new instance.
+     */
     public void reassign() {
         this.jobManager.reassign(this.job);
     }
 
-    public void persistJobProperties(final String... propNames) {
-        if ( propNames != null ) {
-            this.jobManager.persistJobProperties(this.job, propNames);
-        }
+    /**
+     * Update the property of a job in the resource tree
+     */
+    public boolean persistJobProperties(final String... propNames) {
+        return this.jobManager.persistJobProperties(this.job, propNames);
     }
 
     public boolean isStopped() {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Fri Oct 17 15:46:02 2014
@@ -127,7 +127,8 @@ public class JobManagerImpl
     @Reference
     private StatisticsManager statisticsManager;
 
-    @Reference QueueManager qManager;
+    @Reference
+    private QueueManager qManager;
 
     private volatile TopologyCapabilities topologyCapabilities;
 
@@ -795,38 +796,6 @@ public class JobManagerImpl
     }
 
     /**
-     * Reschedule a job.
-     *
-     * Update the retry count and remove the started time.
-     * @param job The job
-     * @return true if the job could be updated.
-     */
-    public boolean reschedule(final JobImpl job) {
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
-                if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
-                    mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
-                }
-                mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
-                try {
-                    resolver.commit();
-                    return true;
-                } catch ( final PersistenceException pe ) {
-                    ignoreException(pe);
-                }
-            }
-        } finally {
-            resolver.close();
-        }
-
-        return false;
-    }
-
-    /**
      * Try to get a "lock" for a resource
      */
     private boolean lock(final String jobTopic, final String id) {
@@ -995,51 +964,6 @@ public class JobManagerImpl
     }
 
     /**
-     * Reassign a job to a new instance
-     * @param job The job to reassign.
-     */
-    public void reassign(final JobImpl job) {
-        final QueueInfo queueInfo = queueManager.getQueueInfo(job.getTopic());
-
-        // Sanity check if queue configuration has changed
-        final TopologyCapabilities caps = this.topologyCapabilities;
-        final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
-
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                try {
-                    final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-                    final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
-
-                    final Map<String, Object> props = new HashMap<String, Object>(vm);
-                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                    if ( targetId == null ) {
-                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                    } else {
-                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
-                    }
-                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
-                    try {
-                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                        resolver.delete(jobResource);
-                        resolver.commit();
-                    } catch ( final PersistenceException pe ) {
-                        this.ignoreException(pe);
-                    }
-                } catch (final InstantiationException ie) {
-                    // something happened with the resource in the meantime
-                    this.ignoreException(ie);
-                }
-            }
-        } finally {
-            resolver.close();
-        }
-    }
-
-    /**
      * Get the current capabilities
      */
     public TopologyCapabilities getTopologyCapabilities() {
@@ -1047,41 +971,6 @@ public class JobManagerImpl
     }
 
     /**
-     * Update the property of a job in the resource tree
-     */
-    public boolean persistJobProperties(final JobImpl job, final String... propNames) {
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                for(final String propName : propNames) {
-                    final Object val = job.getProperty(propName);
-                    if ( val != null ) {
-                        if ( val.getClass().isEnum() ) {
-                            mvm.put(propName, val.toString());
-                        } else {
-                            mvm.put(propName, val);
-                        }
-                    } else {
-                        mvm.remove(propName);
-                    }
-                }
-                resolver.commit();
-
-                return true;
-            } else {
-                logger.debug("No job resource found at {}", job.getResourcePath());
-            }
-        } catch ( final PersistenceException ignore ) {
-            this.ignoreException(ignore);
-        } finally {
-            resolver.close();
-        }
-        return false;
-    }
-
-    /**
      * @see org.apache.sling.event.jobs.JobManager#stopJobById(java.lang.String)
      */
     @Override
@@ -1241,4 +1130,119 @@ public class JobManagerImpl
         }
         return null;
     }
+
+    /**
+     * Reassign a job to a new instance
+     */
+    public void reassign(final JobImpl job) {
+        final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
+        // Sanity check if queue configuration has changed
+        final TopologyCapabilities caps = this.topologyCapabilities;
+        final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
+            if ( jobResource != null ) {
+                try {
+                    final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+                    final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+                    final Map<String, Object> props = new HashMap<String, Object>(vm);
+                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                    if ( targetId == null ) {
+                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                    } else {
+                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                    }
+                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                    try {
+                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                        resolver.delete(jobResource);
+                        resolver.commit();
+                    } catch ( final PersistenceException pe ) {
+                        this.ignoreException(pe);
+                    }
+                } catch (final InstantiationException ie) {
+                    // something happened with the resource in the meantime
+                    this.ignoreException(ie);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
+    }
+
+    /**
+     * Reschedule the job
+     * Update the retry count and remove the started time.
+     * @param job The job
+     * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
+     */
+    public boolean reschedule(final JobImpl job) {
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
+            if ( jobResource != null ) {
+                final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+                mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+                if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+                    mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+                }
+                mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                try {
+                    resolver.commit();
+                    return true;
+                } catch ( final PersistenceException pe ) {
+                    this.logger.debug("Unable to update reschedule properties for job " + job.getId(), pe);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
+
+        return false;
+    }
+
+    /**
+     * Update the property of a job in the resource tree
+     * @param job The job
+     * @param propNames the property names to update
+     * @return {@code true} if the update was successful.
+     */
+    public boolean persistJobProperties(final JobImpl job, final String... propNames) {
+        if ( propNames != null ) {
+            final ResourceResolver resolver = this.configuration.createResourceResolver();
+            try {
+                final Resource jobResource = resolver.getResource(job.getResourcePath());
+                if ( jobResource != null ) {
+                    final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+                    for(final String propName : propNames) {
+                        final Object val = job.getProperty(propName);
+                        if ( val != null ) {
+                            if ( val.getClass().isEnum() ) {
+                                mvm.put(propName, val.toString());
+                            } else {
+                                mvm.put(propName, val);
+                            }
+                        } else {
+                            mvm.remove(propName);
+                        }
+                    }
+                    resolver.commit();
+
+                    return true;
+                } else {
+                    logger.debug("No job resource found at {}", job.getResourcePath());
+                }
+            } catch ( final PersistenceException ignore ) {
+                logger.debug("Unable to persist properties", ignore);
+            } finally {
+                resolver.close();
+            }
+            return false;
+        }
+        return true;
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Fri Oct 17 15:46:02 2014
@@ -141,6 +141,11 @@ public class QueueConfigurationManager {
         public InternalQueueConfiguration queueConfiguration;
         public String queueName;
         public String targetId;
+
+        @Override
+        public String toString() {
+            return queueName;
+        }
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri Oct 17 15:46:02 2014
@@ -35,6 +35,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
@@ -150,6 +151,10 @@ public abstract class AbstractJobQueue
         return this.services.statisticsManager.getQueueStatistics(this.queueName);
     }
 
+    public QueueJobCache getCache() {
+        return this.services.cache;
+    }
+
     /**
      * Start the job queue.
      */
@@ -158,7 +163,7 @@ public abstract class AbstractJobQueue
 
             @Override
             public void run() {
-                while ( running ) {
+                while ( running && !isOutdated()) {
                     logger.info("Starting job queue {}", queueName);
                     logger.debug("Configuration for job queue={}", configuration);
 
@@ -186,8 +191,7 @@ public abstract class AbstractJobQueue
      * Outdate this queue.
      */
     public void outdate() {
-        if ( !this.isOutdated() ) {
-            this.isOutdated.set(true);
+        if ( this.isOutdated.compareAndSet(false, true) ) {
             final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
             this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
             this.queueName = name;
@@ -229,8 +233,8 @@ public abstract class AbstractJobQueue
             this.logger.debug("Waking up waiting queue {}", this.queueName);
             this.notifyFinished(false);
         }
-        // continue queue processing to stop the queue
-        this.services.topicManager.stop(this.getName());
+        // stop the queue
+        this.stopWaitingForNextJob();
 
         synchronized ( this.processingJobsLists ) {
             this.processingJobsLists.clear();
@@ -286,7 +290,7 @@ public abstract class AbstractJobQueue
                     if ( handler.reschedule() ) {
                         this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
                         handler.getJob().retry();
-                        this.services.topicManager.reschedule(handler);
+                        this.requeue(handler);
                         this.notifyFinished(true);
                     }
                 }
@@ -298,7 +302,7 @@ public abstract class AbstractJobQueue
      * Execute the queue
      */
     private void runJobQueue() {
-        while ( this.running ) {
+        while ( this.running && !this.isOutdated()) {
             JobHandler info = null;
             if ( info == null ) {
                 // so let's wait/get the next job from the queue
@@ -306,15 +310,81 @@ public abstract class AbstractJobQueue
             }
 
             // if we're suspended we drop the current item
-            if ( this.running && info != null && !checkSuspended(info) ) {
+            if ( this.running && info != null && !this.isOutdated() && !checkSuspended(info) ) {
                 // if we still have a job and are running, let's go
                 this.start(info);
             }
         }
     }
 
+    private volatile boolean isWaitingForNextJob = false;
+    private final Object nextJobLock = new Object();
+
+    /**
+     * Take a new job for this queue.
+     * This method blocks until a job is available or the queue is stopped.
+     * @param queueName The queue name
+     * @return A new job or {@code null} if {@link #stop(String)} is called.
+     */
     private JobHandler take() {
-        return this.services.topicManager.take(this.getName());
+        logger.debug("Taking new job for {}", queueName);
+        JobImpl result = null;
+
+        this.isWaitingForNextJob = true;
+        while ( this.isWaitingForNextJob && !this.isOutdated()) {
+            result = this.services.cache.getNextJob();
+            if ( result != null ) {
+                isWaitingForNextJob = false;
+            } else {
+                // block
+                synchronized ( nextJobLock ) {
+                    if ( isWaitingForNextJob ) {
+                        try {
+                            nextJobLock.wait();
+                        } catch ( final InterruptedException ignore ) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            }
+        }
+        this.isWaitingForNextJob = false;
+
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
+        }
+        return (result != null ? new JobHandler( result, (JobManagerImpl)this.services.jobManager) : null);
+    }
+
+    /**
+     * Stop waiting for a job
+     * @param queueName The queue name.
+     */
+    private void stopWaitingForNextJob() {
+        synchronized ( nextJobLock ) {
+            this.isWaitingForNextJob = false;
+            nextJobLock.notify();
+        }
+    }
+
+    /**
+     * Inform the queue about a job for the topic
+     * @param topic A new topic.
+     */
+    public void wakeUpQueue(final String topic) {
+        this.services.cache.handleNewJob(topic);
+        this.stopWaitingForNextJob();
+    }
+
+    /**
+     * Put a job back in the queue
+     * @param handler The job handler
+     */
+    private void requeue(final JobHandler handler) {
+        this.services.cache.reschedule(handler);
+        synchronized ( this.nextJobLock ) {
+            this.nextJobLock.notify();
+        }
     }
 
     /**
@@ -324,7 +394,7 @@ public abstract class AbstractJobQueue
         boolean wasSuspended = false;
         synchronized ( this.suspendLock ) {
             while ( this.suspendedSince != -1 ) {
-                this.services.topicManager.reschedule(handler);
+                this.requeue(handler);
                 logger.debug("Sleeping as queue {} is suspended.", this.getName());
                 wasSuspended = true;
                 final long diff = System.currentTimeMillis() - this.suspendedSince;
@@ -820,7 +890,7 @@ public abstract class AbstractJobQueue
     }
 
     protected void reschedule(final JobHandler handler) {
-        this.services.topicManager.reschedule(handler);
+        this.requeue(handler);
     }
 
     /**
@@ -851,5 +921,6 @@ public abstract class AbstractJobQueue
     protected abstract void start(final JobHandler handler);
 
     protected abstract void notifyFinished(boolean reschedule);
+
 }
 

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java&r1=1632478&r2=1632617&rev=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Fri Oct 17 15:46:02 2014
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs.topics;
+package org.apache.sling.event.impl.jobs.queues;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
  * The queue job cache caches jobs per queue based on the topics the queue is actively
  * processing.
  *
- * TODO cache needs to be synchronized!
  */
 public class QueueJobCache {
 
@@ -77,7 +77,7 @@ public class QueueJobCache {
             final Set<String> topics) {
         this.configuration = configuration;
         this.info = info;
-        this.topics = topics;
+        this.topics = new ConcurrentSkipListSet<String>(topics);
         this.topicsWithNewJobs.addAll(topics);
     }
 
@@ -216,6 +216,7 @@ public class QueueJobCache {
         synchronized ( this.topicsWithNewJobs ) {
             this.topicsWithNewJobs.add(topic);
         }
+        this.topics.add(topic);
     }
 
     public void reschedule(final JobHandler handler) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Fri Oct 17 15:46:02 2014
@@ -19,9 +19,11 @@
 package org.apache.sling.event.impl.jobs.queues;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -41,9 +43,9 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
 import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.impl.jobs.topics.TopicManager;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.jmx.QueuesMBean;
@@ -90,7 +92,7 @@ public class QueueManager
     private StatisticsManager statisticsManager;
 
     @Reference
-    private QueueConfigurationManager queueManager;
+    private QueueConfigurationManager queueConfigurationManager;
 
     /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
     private final Object queuesLock = new Object();
@@ -175,9 +177,15 @@ public class QueueManager
      * This method first searches the corresponding queue - if such a queue
      * does not exist yet, it is created and started.
      *
+     * @param topicManager The topic manager
+     * @param jobManager The job manager
+     * @param queueInfo The queue info
      * @param topic The topic
      */
-    public void start(final TopicManager topicManager, final QueueInfo queueInfo) {
+    public void start(final TopicManager topicManager,
+            final JobManager jobManager,
+            final QueueInfo queueInfo,
+            final String topic) {
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
         // get or create queue
         AbstractJobQueue queue = null;
@@ -198,6 +206,10 @@ public class QueueManager
                 services.threadPoolManager = this.threadPoolManager;
                 services.topicManager = topicManager;
                 services.statisticsManager = statisticsManager;
+                services.jobManager = jobManager;
+                final Set<String> topics = new HashSet<String>();
+                topics.add(topic);
+                services.cache = new QueueJobCache(configuration, queueInfo, topics);
                 if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
                     queue = new OrderedJobQueue(queueInfo.queueName, config, services);
                 } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
@@ -211,6 +223,8 @@ public class QueueManager
                     ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
                     queue.start();
                 }
+            } else {
+                queue.wakeUpQueue(topic);
             }
         }
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Fri Oct 17 15:46:02 2014
@@ -22,7 +22,7 @@ import org.apache.sling.commons.schedule
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.impl.jobs.topics.TopicManager;
+import org.apache.sling.event.jobs.JobManager;
 import org.osgi.service.event.EventAdmin;
 
 public class QueueServices {
@@ -38,4 +38,8 @@ public class QueueServices {
     public TopicManager topicManager;
 
     public StatisticsManager statisticsManager;
+
+    public QueueJobCache cache;
+
+    public JobManager jobManager;
 }

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java&r1=1632478&r2=1632617&rev=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java Fri Oct 17 15:46:02 2014
@@ -16,15 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs.topics;
+package org.apache.sling.event.impl.jobs.queues;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -36,15 +34,12 @@ import org.apache.felix.scr.annotations.
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
-import org.apache.sling.event.impl.jobs.queues.QueueManager;
 import org.apache.sling.event.impl.jobs.topology.TopologyAware;
 import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
@@ -61,8 +56,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Topic manager
  *
- * TODO - Check syncing of take/update/stop. This might not be 100% correct yet.
- * TODO - Block take() if inactive
  */
 @Component(immediate=true)
 @Service(value=EventHandler.class)
@@ -70,7 +63,7 @@ import org.slf4j.LoggerFactory;
 public class TopicManager implements EventHandler, TopologyAware {
 
     /** Logger. */
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
 
     @Reference
     private JobManagerConfiguration configuration;
@@ -87,18 +80,10 @@ public class TopicManager implements Eve
     @Reference
     private JobManager jobManager;
 
-    /** Queue configuration counter to track for changes. */
-    private volatile int queueConfigCount = -1;
-
-    /** A set of all topics. Access needs synchronization. */
-    private final Set<String> topics = new TreeSet<String>();
-
-    /** Marker if a new topic has been added. */
-    private final AtomicBoolean topicsChanged = new AtomicBoolean(false);
-
-    /** The mapping from a queue name to a cache. */
-    private Map<String, QueueJobCache> queueJobCaches = Collections.emptyMap();
+    /** The mapping from a topic to a queue info. */
+    private final Map<String, QueueInfo> topicMapping = new HashMap<String, QueueInfo>();
 
+    /** Flag whether the manager is active or suspended. */
     private final AtomicBoolean isActive = new AtomicBoolean(false);
 
     /**
@@ -118,10 +103,36 @@ public class TopicManager implements Eve
     }
 
     /**
+     * This method is called whenever the topology or queue configurations change.
+     * @param caps The new topology capabilities or {@code null} if currently unknown.
+     */
+    @Override
+    public void topologyChanged(final TopologyCapabilities caps) {
+        logger.debug("Topology changed {}", caps);
+        synchronized ( this.topicMapping ) {
+            if ( caps != null ) {
+                final Set<String> topics = this.initialScan();
+                this.updateTopicMapping(topics);
+                // start queues
+                for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet() ) {
+                    this.queueManager.start(this, this.jobManager, entry.getValue(), entry.getKey());
+                }
+                this.isActive.set(true);
+            } else {
+                this.isActive.set(false);
+                this.queueManager.restart();
+                this.topicMapping.clear();
+            }
+        }
+    }
+
+    /**
      * Scan the resource tree for topics.
      */
-    private void initialScan() {
+    private Set<String> initialScan() {
         logger.debug("Scanning repository for existing topics...");
+        final Set<String> topics = new HashSet<String>();
+
         final ResourceResolver resolver = this.configuration.createResourceResolver();
         try {
             final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
@@ -133,15 +144,13 @@ public class TopicManager implements Eve
                     final Resource topicResource = topicIter.next();
                     final String topic = topicResource.getName().replace('.', '/');
                     logger.debug("Found topic {}", topic);
-                    synchronized ( topics ) {
-                        topics.add(topic);
-                    }
-                    topicsChanged.set(true);
+                    topics.add(topic);
                 }
             }
         } finally {
             resolver.close();
         }
+        return topics;
     }
 
     /**
@@ -150,117 +159,30 @@ public class TopicManager implements Eve
     @Override
     public void handleEvent(final Event event) {
         final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
-        if ( topic != null ) {
-            boolean changed = false;
-            synchronized ( topics ) {
-                final int len = topics.size();
-                topics.add(topic);
-                changed = topics.size() > len;
-            }
-            final QueueInfo info = this.queueConfigMgr.getQueueInfo(topic);
-            if ( changed ) {
-                topicsChanged.set(true);
-                this.queueManager.start(this, info);
-            } else {
-                final QueueJobCache cache = this.queueJobCaches.get(info.queueName);
-                if ( cache != null ) {
-                    cache.handleNewJob(topic);
-                    final Object lock = this.queueLocks.get(info.queueName);
-                    if ( lock != null ) {
-                        synchronized ( lock ) {
-                            lock.notify();
-                        }
-                    }
+        synchronized ( this.topicMapping ) {
+            if ( this.isActive.get() && topic != null ) {
+                QueueInfo info = this.topicMapping.get(topic);
+                if ( info == null ) {
+                    info = this.queueConfigMgr.getQueueInfo(topic);
+                    this.topicMapping.put(topic, info);
                 }
+                this.queueManager.start(this, this.jobManager, info, topic);
             }
         }
     }
 
     /**
      * Get the latest mapping from queue name to topics
-     * @return A map from queue names to topics
      */
-    private Map<String, QueueJobCache> updateConfiguration() {
-        if ( this.queueConfigCount < this.queueConfigMgr.getChangeCount() || this.topicsChanged.get() ) {
+    private void updateTopicMapping(final Set<String> topics) {
+        this.topicMapping.clear();
 
-            final Map<String, Set<String>> mapping = new HashMap<String, Set<String>>();
-
-            synchronized ( this.topics ) {
-                this.queueConfigCount = this.queueConfigMgr.getChangeCount();
-                this.topicsChanged.set(false);
-
-                for(final String topic : this.topics) {
-                    final QueueInfo queueInfo = this.queueConfigMgr.getQueueInfo(topic);
-                    Set<String> names = mapping.get(queueInfo.queueName);
-                    if ( names == null ) {
-                        names = new TreeSet<String>();
-                        mapping.put(queueInfo.queueName, names);
-                    }
-                    names.add(topic);
-                }
-            }
-
-            final Map<String, QueueJobCache> cacheMap = new HashMap<String, QueueJobCache>();
-            for(final Map.Entry<String, Set<String>> entry : mapping.entrySet()) {
-                cacheMap.put(entry.getKey(),
-                        new QueueJobCache(this.configuration, this.queueConfigMgr.getQueueInfo(entry.getValue().iterator().next()), entry.getValue()));
-            }
-            this.queueJobCaches = cacheMap;
-            this.logger.debug("Established new topic mapping: {}", mapping);
+        for(final String topic : topics) {
+            final QueueInfo queueInfo = this.queueConfigMgr.getQueueInfo(topic);
+            this.topicMapping.put(topic, queueInfo);
         }
-        return this.queueJobCaches;
-    }
-
-    private final Map<String, Object> queueLocks = new ConcurrentHashMap<String, Object>();
 
-    public JobHandler take(final String queueName) {
-        logger.debug("Taking new job for {}", queueName);
-        Object lock = new Object();
-        this.queueLocks.put(queueName, lock);
-        JobImpl result = null;
-        try {
-            boolean isWaiting = true;
-            while ( isWaiting ) {
-                final Map<String, QueueJobCache> mapping = this.updateConfiguration();
-                final QueueJobCache cache = mapping.get(queueName);
-                if ( cache != null ) {
-                    result = cache.getNextJob();
-                    if ( result != null ) {
-                        isWaiting = false;
-                    }
-                }
-                if ( isWaiting ) {
-                    // block
-                    synchronized ( lock ) {
-                        try {
-                            lock.wait();
-                            // refetch lock
-                            lock = this.queueLocks.get(queueName);
-                            if ( lock == null ) {
-                                isWaiting = false;
-                            }
-                        } catch ( final InterruptedException ignore ) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                }
-            }
-        } finally {
-            this.queueLocks.remove(queueName);
-        }
-        if ( logger.isDebugEnabled() ) {
-            logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
-        }
-        return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null);
-    }
-
-    public void stop(final String queueName) {
-        final Object lock = queueLocks.remove(queueName);
-        if ( lock != null ) {
-            synchronized ( lock ) {
-                lock.notify();
-            }
-        }
+        this.logger.debug("Established new topic mapping: {}", this.topicMapping);
     }
 
     /**
@@ -268,83 +190,57 @@ public class TopicManager implements Eve
      * @param queueName The queue name
      */
     public void removeAll(final String queueName) {
-        final Map<String, QueueJobCache> mapping = this.updateConfiguration();
-        final QueueJobCache cache = mapping.get(queueName);
-        if ( cache != null ) {
-            final Set<String> topics = cache.getTopics();
-            logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
-
-            if ( topics != null ) {
-
-                final ResourceResolver resolver = this.configuration.createResourceResolver();
-                try {
-                    final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
-
-                    // sanity check - should never be null
-                    if ( baseResource != null ) {
-                        final BatchResourceRemover brr = new BatchResourceRemover();
-
-                        for(final String t : topics) {
-                            final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
-                            if ( topicResource != null ) {
-                                JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
-
-                                    @Override
-                                    public boolean handle(final JobImpl job) {
-                                        final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
-                                        // sanity check
-                                        if ( jobResource != null ) {
-                                            try {
-                                                brr.delete(jobResource);
-                                            } catch ( final PersistenceException ignore) {
-                                                logger.error("Unable to remove job " + job, ignore);
-                                            }
+        final Set<String> topics = new HashSet<String>();
+        synchronized ( this.topicMapping ) {
+            for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet()) {
+                if ( entry.getValue().queueName.equals(queueName) ) {
+                    topics.add(entry.getKey());
+                }
+            }
+        }
+        logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
+
+        if ( !topics.isEmpty() ) {
+
+            final ResourceResolver resolver = this.configuration.createResourceResolver();
+            try {
+                final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
+
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final BatchResourceRemover brr = new BatchResourceRemover();
+
+                    for(final String t : topics) {
+                        final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
+                        if ( topicResource != null ) {
+                            JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
+
+                                @Override
+                                public boolean handle(final JobImpl job) {
+                                    final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+                                    // sanity check
+                                    if ( jobResource != null ) {
+                                        try {
+                                            brr.delete(jobResource);
+                                        } catch ( final PersistenceException ignore) {
+                                            logger.error("Unable to remove job " + job, ignore);
+                                            topicResource.getResourceResolver().revert();
+                                            topicResource.getResourceResolver().refresh();
                                         }
-                                        return true;
                                     }
-                                });
-                            }
-                        }
-                        try {
-                            resolver.commit();
-                        } catch ( final PersistenceException ignore) {
-                            logger.error("Unable to remove jobs", ignore);
+                                    return true;
+                                }
+                            });
                         }
                     }
-                } finally {
-                    resolver.close();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void topologyChanged(final TopologyCapabilities caps) {
-        this.isActive.set(caps != null);
-        if ( this.isActive.get() ) {
-            this.initialScan();
-            for(final Map.Entry<String, QueueJobCache> entry : this.updateConfiguration().entrySet()) {
-                this.queueManager.start(this, entry.getValue().getQueueInfo());
-            }
-        } else {
-            this.queueManager.restart();
-        }
-    }
-
-    /**
-     * Reschedule a job
-     * @param handler The job handler
-     */
-    public void reschedule(final JobHandler handler) {
-        final QueueInfo info = this.queueConfigMgr.getQueueInfo(handler.getJob().getTopic());
-        final QueueJobCache cache = this.queueJobCaches.get(info.queueName);
-        if ( cache != null ) {
-            cache.reschedule(handler);
-            final Object lock = this.queueLocks.get(info.queueName);
-            if ( lock != null ) {
-                synchronized ( lock ) {
-                    lock.notify();
+                    try {
+                        resolver.commit();
+                    } catch ( final PersistenceException ignore) {
+                        logger.error("Unable to remove jobs", ignore);
+                    }
                 }
+            } finally {
+                resolver.close();
             }
         }
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java Fri Oct 17 15:46:02 2014
@@ -18,8 +18,14 @@
  */
 package org.apache.sling.event.impl.jobs.topology;
 
-
+/**
+ * Listener interface to get topology / queue changes.
+ */
 public interface TopologyAware {
 
+    /**
+     * Notify about a change.
+     * @param caps The new topology capabilities or {@code null}
+     */
     void topologyChanged(final TopologyCapabilities caps);
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java Fri Oct 17 15:46:02 2014
@@ -112,10 +112,8 @@ public class TopologyHandler
         final CheckTopologyTask mt = new CheckTopologyTask(this.configuration, this.queueConfigManager);
         mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
 
-        if ( !isConfigChange ) {
-            // start listeners
-            this.notifiyListeners();
-        }
+        // start listeners
+        this.notifiyListeners();
     }
 
     private void notifiyListeners() {