You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/18 17:51:05 UTC

svn commit: r1632809 [1/2] - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/config/ main/java/org/apache/sling/event/impl/jobs/console/ main/java/org/apache/sling/eve...

Author: cziegeler
Date: Sat Oct 18 15:51:05 2014
New Revision: 1632809

URL: http://svn.apache.org/r1632809
Log:
SLING-4048 : Avoid keeping jobs in memory. Another refactoring: move all configuration code into config package (WiP)

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java
      - copied, changed from r1632617, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
      - copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
      - copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
      - copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
      - copied, changed from r1632528, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
      - copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
      - copied, changed from r1632520, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
Removed:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/InstanceDescriptionComparatorTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Sat Oct 18 15:51:05 2014
@@ -40,7 +40,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.discovery.PropertyProvider;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.support.TopicMatcher;
 import org.apache.sling.event.impl.support.TopicMatcherHelper;
 import org.apache.sling.event.jobs.Job;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Sat Oct 18 15:51:05 2014
@@ -19,6 +19,20 @@
 package org.apache.sling.event.impl.jobs;
 
 
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.Queue;
 
@@ -35,12 +49,12 @@ public class JobHandler {
 
     private volatile boolean isStopped = false;
 
-    private final JobManagerImpl jobManager;
+    private final JobManagerConfiguration configuration;
 
     public JobHandler(final JobImpl job,
-            final JobManagerImpl jobManager) {
+            final JobManagerConfiguration configuration) {
         this.job = job;
-        this.jobManager = jobManager;
+        this.configuration = configuration;
     }
 
     public JobImpl getJob() {
@@ -53,39 +67,174 @@ public class JobHandler {
     }
 
     /**
-     * Finish the processing of the job
-     * @param state The state of processing
-     */
-    public void finished(final Job.JobState state, final boolean keepJobInHistory, final long duration) {
-        // for now we just keep cancelled jobs
-        this.jobManager.finishJob(this.job, state, keepJobInHistory, duration);
-    }
-
-    /**
      * Reschedule the job
      * Update the retry count and remove the started time.
      * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
      */
     public boolean reschedule() {
-        return this.jobManager.reschedule(this.job);
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
+            if ( jobResource != null ) {
+                final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+                mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+                if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+                    mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+                }
+                mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                try {
+                    resolver.commit();
+                    return true;
+                } catch ( final PersistenceException pe ) {
+                    this.configuration.getMainLogger().debug("Unable to update reschedule properties for job " + job.getId(), pe);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
+
+        return false;
     }
 
-    public void cancel() {
-        this.jobManager.finishJob(this.job, Job.JobState.DROPPED, true, -1);
+    /**
+     * Finish a job
+     * @param info  The job handler
+     * @param state The state of the processing
+     */
+    public void finished(final Job.JobState state,
+                          final boolean keepJobInHistory,
+                          final long duration) {
+        final boolean isSuccess = (state == Job.JobState.SUCCEEDED);
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
+            if ( jobResource != null ) {
+                try {
+                    String newPath = null;
+                    if ( keepJobInHistory ) {
+                        final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+                        newPath = this.configuration.getStoragePath(job.getTopic(), job.getId(), isSuccess);
+                        final Map<String, Object> props = new HashMap<String, Object>(vm);
+                        props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
+                        if ( isSuccess ) {
+                            // we set the finish date to start date + duration
+                            final Date finishDate = new Date();
+                            finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
+                            final Calendar finishCal = Calendar.getInstance();
+                            finishCal.setTime(finishDate);
+                            props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
+                        } else {
+                            // current time is good enough
+                            props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
+                        }
+                        if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+                            props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+                        }
+                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                    }
+                    resolver.delete(jobResource);
+                    resolver.commit();
+
+                    if ( keepJobInHistory && configuration.getMainLogger().isDebugEnabled() ) {
+                        if ( isSuccess ) {
+                            configuration.getMainLogger().debug("Kept successful job {} at {}", Utility.toString(job), newPath);
+                        } else {
+                            configuration.getMainLogger().debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
+                        }
+                    }
+                } catch ( final PersistenceException pe ) {
+                    this.configuration.getMainLogger().warn("Unable to finish job " + job.getId(), pe);
+                } catch (final InstantiationException ie) {
+                    // something happened with the resource in the meantime
+                    this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
     }
 
     /**
      * Reassign to a new instance.
      */
     public void reassign() {
-        this.jobManager.reassign(this.job);
+        final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
+        // Sanity check if queue configuration has changed
+        final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
+        final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
+            if ( jobResource != null ) {
+                try {
+                    final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+                    final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+                    final Map<String, Object> props = new HashMap<String, Object>(vm);
+                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                    if ( targetId == null ) {
+                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                    } else {
+                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                    }
+                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                    try {
+                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                        resolver.delete(jobResource);
+                        resolver.commit();
+                    } catch ( final PersistenceException pe ) {
+                        this.configuration.getMainLogger().warn("Unable to reassign job " + job.getId(), pe);
+                    }
+                } catch (final InstantiationException ie) {
+                    // something happened with the resource in the meantime
+                    this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
     }
 
     /**
      * Update the property of a job in the resource tree
+     * @param propNames the property names to update
+     * @return {@code true} if the update was successful.
      */
     public boolean persistJobProperties(final String... propNames) {
-        return this.jobManager.persistJobProperties(this.job, propNames);
+        if ( propNames != null ) {
+            final ResourceResolver resolver = this.configuration.createResourceResolver();
+            try {
+                final Resource jobResource = resolver.getResource(job.getResourcePath());
+                if ( jobResource != null ) {
+                    final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+                    for(final String propName : propNames) {
+                        final Object val = job.getProperty(propName);
+                        if ( val != null ) {
+                            if ( val.getClass().isEnum() ) {
+                                mvm.put(propName, val.toString());
+                            } else {
+                                mvm.put(propName, val);
+                            }
+                        } else {
+                            mvm.remove(propName);
+                        }
+                    }
+                    resolver.commit();
+
+                    return true;
+                } else {
+                    this.configuration.getMainLogger().debug("No job resource found at {}", job.getResourcePath());
+                }
+            } catch ( final PersistenceException ignore ) {
+                this.configuration.getMainLogger().debug("Unable to persist properties", ignore);
+            } finally {
+                resolver.close();
+            }
+            return false;
+        }
+        return true;
     }
 
     public boolean isStopped() {
@@ -96,6 +245,15 @@ public class JobHandler {
         this.isStopped = true;
     }
 
+    public void addToRetryList() {
+        this.configuration.addJobToRetryList(this.job);
+
+    }
+
+    public void removeFromRetryList() {
+        this.configuration.removeJobFromRetryList(this.job);
+    }
+
     @Override
     public int hashCode() {
         return this.job.getId().hashCode();
@@ -113,13 +271,4 @@ public class JobHandler {
     public String toString() {
         return "JobHandler(" + this.job.getId() + ")";
     }
-
-    public void addToRetryList() {
-        this.jobManager.addJobToRetryList(this.job);
-
-    }
-
-    public void removeFromRetryList() {
-        this.jobManager.removeJobFromRetryList(this.job);
-    }
 }
\ No newline at end of file

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Sat Oct 18 15:51:05 2014
@@ -21,7 +21,6 @@ package org.apache.sling.event.impl.jobs
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -37,7 +36,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.jackrabbit.util.ISO9075;
 import org.apache.sling.api.SlingConstants;
 import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.ModifiableValueMap;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.QuerySyntaxException;
 import org.apache.sling.api.resource.Resource;
@@ -46,15 +44,16 @@ import org.apache.sling.api.resource.Val
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
 import org.apache.sling.event.impl.jobs.queues.QueueManager;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.impl.jobs.topology.TopologyAware;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
-import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
+import org.apache.sling.event.impl.jobs.tasks.CleanUpTask;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.impl.support.ScheduleInfoImpl;
@@ -94,15 +93,12 @@ import org.slf4j.LoggerFactory;
                      ResourceHelper.BUNDLE_EVENT_UPDATED})
 })
 public class JobManagerImpl
-    implements JobManager, EventHandler, Runnable, TopologyAware {
+    implements JobManager, EventHandler, Runnable, ConfigurationChangeListener {
 
     /** Default logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Reference
-    private TopologyHandler topologyHandler;
-
-    @Reference
     private EventAdmin eventAdmin;
 
     @Reference
@@ -132,7 +128,7 @@ public class JobManagerImpl
 
     private volatile TopologyCapabilities topologyCapabilities;
 
-    private MaintenanceTask maintenanceTask;
+    private CleanUpTask maintenanceTask;
 
     /** We count the scheduler runs. */
     private volatile long schedulerRuns;
@@ -147,9 +143,9 @@ public class JobManagerImpl
     @Activate
     protected void activate(final Map<String, Object> props) throws LoginException {
         this.jobScheduler = new JobSchedulerImpl(this.configuration, this.scheduler, this);
-        this.maintenanceTask = new MaintenanceTask(this.configuration);
+        this.maintenanceTask = new CleanUpTask(this.configuration);
 
-        this.topologyHandler.addListener(this);
+        this.configuration.addListener(this);
         logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID);
     }
 
@@ -159,7 +155,7 @@ public class JobManagerImpl
     @Deactivate
     protected void deactivate() {
         logger.info("Apache Sling Job Manager stopping on instance {}", Environment.APPLICATION_ID);
-        this.topologyHandler.removeListener(this);
+        this.configuration.removeListener(this);
 
         this.jobScheduler.deactivate();
 
@@ -178,7 +174,7 @@ public class JobManagerImpl
         logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns);
 
         // invoke maintenance task
-        final MaintenanceTask task = this.maintenanceTask;
+        final CleanUpTask task = this.maintenanceTask;
         if ( task != null ) {
             task.run(this.topologyCapabilities, this.schedulerRuns - 1);
         }
@@ -243,23 +239,13 @@ public class JobManagerImpl
         }
     }
 
-    private void stopProcessing() {
-        this.topologyCapabilities = null;
-    }
-
-    private void startProcessing(final TopologyCapabilities caps) {
-        // create new capabilities and update view
-        this.topologyCapabilities = caps;
-    }
-
     @Override
-    public void topologyChanged(final TopologyCapabilities caps) {
-        if ( caps == null ) {
-            this.stopProcessing();
+    public void configurationChanged(final boolean active) {
+        if ( !active ) {
+            this.topologyCapabilities = null;
         } else {
-            this.startProcessing(caps);
+            this.topologyCapabilities = this.configuration.getTopologyCapabilities();
         }
-        this.jobScheduler.topologyChanged(caps);
     }
 
     /**
@@ -384,7 +370,7 @@ public class JobManagerImpl
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Found removal job: {}", Utility.toString(job));
             }
-            final JobImpl retryJob = this.getJobFromRetryList(jobId);
+            final JobImpl retryJob = (JobImpl)this.configuration.getJobFromRetryList(jobId);
             if ( retryJob != null ) {
                 job = retryJob;
             }
@@ -416,7 +402,8 @@ public class JobManagerImpl
                         resolver.close();
                     }
                 } else {
-                    this.finishJob(job, Job.JobState.DROPPED, true, -1);
+                    final JobHandler jh = new JobHandler(job, this.configuration);
+                    jh.finished(Job.JobState.DROPPED, true, -1);
                 }
             }
         } else {
@@ -736,64 +723,7 @@ public class JobManagerImpl
         return result;
     }
 
-    /**
-     * Finish a job
-     * @param info  The job handler
-     * @param state The state of the processing
-     */
-    public void finishJob(final JobImpl job,
-                          final Job.JobState state,
-                          final boolean keepJobInHistory,
-                          final long duration) {
-        final boolean isSuccess = (state == Job.JobState.SUCCEEDED);
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                try {
-                    String newPath = null;
-                    if ( keepJobInHistory ) {
-                        final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-                        newPath = this.configuration.getStoragePath(job, isSuccess);
-                        final Map<String, Object> props = new HashMap<String, Object>(vm);
-                        props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
-                        if ( isSuccess ) {
-                            // we set the finish date to start date + duration
-                            final Date finishDate = new Date();
-                            finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
-                            final Calendar finishCal = Calendar.getInstance();
-                            finishCal.setTime(finishDate);
-                            props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
-                        } else {
-                            // current time is good enough
-                            props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
-                        }
-                        if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
-                            props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
-                        }
-                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                    }
-                    resolver.delete(jobResource);
-                    resolver.commit();
 
-                    if ( keepJobInHistory && logger.isDebugEnabled() ) {
-                        if ( isSuccess ) {
-                            logger.debug("Kept successful job {} at {}", Utility.toString(job), newPath);
-                        } else {
-                            logger.debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
-                        }
-                    }
-                } catch ( final PersistenceException pe ) {
-                    this.ignoreException(pe);
-                } catch (final InstantiationException ie) {
-                    // something happened with the resource in the meantime
-                    this.ignoreException(ie);
-                }
-            }
-        } finally {
-            resolver.close();
-        }
-    }
 
     /**
      * Try to get a "lock" for a resource
@@ -1105,144 +1035,4 @@ public class JobManagerImpl
         }
         return null;
     }
-
-    private final List<JobImpl> retryList = new ArrayList<JobImpl>();
-
-    public void addJobToRetryList(final JobImpl job) {
-        synchronized ( retryList ) {
-            retryList.add(job);
-        }
-    }
-
-    public void removeJobFromRetryList(final JobImpl job) {
-        synchronized ( retryList ) {
-            retryList.remove(job);
-        }
-    }
-
-    private JobImpl getJobFromRetryList(final String jobId) {
-        synchronized ( retryList ) {
-            for(final JobImpl j : retryList) {
-                if ( jobId.equals(j.getId())) {
-                    return j;
-                }
-            }
-        }
-        return null;
-    }
-
-    /**
-     * Reassign a job to a new instance
-     */
-    public void reassign(final JobImpl job) {
-        final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
-        // Sanity check if queue configuration has changed
-        final TopologyCapabilities caps = this.topologyCapabilities;
-        final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
-
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                try {
-                    final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-                    final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
-
-                    final Map<String, Object> props = new HashMap<String, Object>(vm);
-                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                    if ( targetId == null ) {
-                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                    } else {
-                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
-                    }
-                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
-                    try {
-                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                        resolver.delete(jobResource);
-                        resolver.commit();
-                    } catch ( final PersistenceException pe ) {
-                        this.ignoreException(pe);
-                    }
-                } catch (final InstantiationException ie) {
-                    // something happened with the resource in the meantime
-                    this.ignoreException(ie);
-                }
-            }
-        } finally {
-            resolver.close();
-        }
-    }
-
-    /**
-     * Reschedule the job
-     * Update the retry count and remove the started time.
-     * @param job The job
-     * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
-     */
-    public boolean reschedule(final JobImpl job) {
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
-                if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
-                    mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
-                }
-                mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
-                try {
-                    resolver.commit();
-                    return true;
-                } catch ( final PersistenceException pe ) {
-                    this.logger.debug("Unable to update reschedule properties for job " + job.getId(), pe);
-                }
-            }
-        } finally {
-            resolver.close();
-        }
-
-        return false;
-    }
-
-    /**
-     * Update the property of a job in the resource tree
-     * @param job The job
-     * @param propNames the property names to update
-     * @return {@code true} if the update was successful.
-     */
-    public boolean persistJobProperties(final JobImpl job, final String... propNames) {
-        if ( propNames != null ) {
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            try {
-                final Resource jobResource = resolver.getResource(job.getResourcePath());
-                if ( jobResource != null ) {
-                    final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                    for(final String propName : propNames) {
-                        final Object val = job.getProperty(propName);
-                        if ( val != null ) {
-                            if ( val.getClass().isEnum() ) {
-                                mvm.put(propName, val.toString());
-                            } else {
-                                mvm.put(propName, val);
-                            }
-                        } else {
-                            mvm.remove(propName);
-                        }
-                    }
-                    resolver.commit();
-
-                    return true;
-                } else {
-                    logger.debug("No job resource found at {}", job.getResourcePath());
-                }
-            } catch ( final PersistenceException ignore ) {
-                logger.debug("Unable to persist properties", ignore);
-            } finally {
-                resolver.close();
-            }
-            return false;
-        }
-        return true;
-    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Sat Oct 18 15:51:05 2014
@@ -46,8 +46,8 @@ import org.apache.sling.api.resource.Val
 import org.apache.sling.commons.scheduler.JobContext;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
-import org.apache.sling.event.impl.jobs.topology.TopologyAware;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.impl.support.ScheduleInfoImpl;
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class JobSchedulerImpl
-    implements EventHandler, TopologyAware, org.apache.sling.commons.scheduler.Job {
+    implements EventHandler, ConfigurationChangeListener, org.apache.sling.commons.scheduler.Job {
 
     private static final String TOPIC_READ_JOB = "org/apache/sling/event/impl/jobs/READSCHEDULEDJOB";
 
@@ -85,7 +85,7 @@ public class JobSchedulerImpl
     /** Is this active? */
     private volatile boolean active;
 
-    private final JobManagerConfiguration config;
+    private final JobManagerConfiguration configuration;
 
     private final Scheduler scheduler;
 
@@ -102,7 +102,7 @@ public class JobSchedulerImpl
     public JobSchedulerImpl(final JobManagerConfiguration configuration,
             final Scheduler scheduler,
             final JobManagerImpl jobManager) {
-        this.config = configuration;
+        this.configuration = configuration;
         this.scheduler = scheduler;
         this.running = true;
         this.jobManager = jobManager;
@@ -121,12 +121,14 @@ public class JobSchedulerImpl
             }
         });
         backgroundThread.start();
+        this.configuration.addListener(this);
     }
 
     /**
      * Deactivate this component.
      */
     public void deactivate() {
+        this.configuration.removeListener(this);
         this.running = false;
         this.stopScheduling();
         synchronized ( this.scheduledJobs ) {
@@ -193,7 +195,7 @@ public class JobSchedulerImpl
                 if ( event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_ADDED)
                      || event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_CHANGED)) {
                     final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
-                    final ResourceResolver resolver = this.config.createResourceResolver();;
+                    final ResourceResolver resolver = this.configuration.createResourceResolver();;
                     try {
                         final Resource eventResource = resolver.getResource(path);
                         if ( ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(eventResource.getResourceType()) ) {
@@ -330,10 +332,9 @@ public class JobSchedulerImpl
     }
 
     public void unschedule(final ScheduledJobInfoImpl info) {
-        final ResourceResolver resolver = this.config.createResourceResolver();
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
         try {
-            final StringBuilder sb = new StringBuilder(this.config.getScheduledJobsPathWithSlash());
-            sb.append('/');
+            final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true));
             sb.append(ResourceHelper.filterName(info.getName()));
             final String path = sb.toString();
 
@@ -371,7 +372,7 @@ public class JobSchedulerImpl
                         @Override
                         public void run() {
                             synchronized (unloadedEvents) {
-                                final ResourceResolver resolver = config.createResourceResolver();
+                                final ResourceResolver resolver = configuration.createResourceResolver();
                                 final Set<String> newUnloadedEvents = new HashSet<String>();
                                 newUnloadedEvents.addAll(unloadedEvents);
                                 try {
@@ -406,7 +407,7 @@ public class JobSchedulerImpl
             } else {
                 final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
                 final String resourceType = (String)event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
-                if ( path != null && path.startsWith(this.config.getScheduledJobsPathWithSlash())
+                if ( path != null && path.startsWith(this.configuration.getScheduledJobsPath(true))
                      && (resourceType == null || ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(resourceType))) {
                     logger.debug("Received resource event for {} : {}", path, resourceType);
                     try {
@@ -424,7 +425,7 @@ public class JobSchedulerImpl
      * Load all scheduled jobs from the resource tree
      */
     private void loadScheduledJobs(final long startTime) {
-        final ResourceResolver resolver = this.config.createResourceResolver();
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
         try {
             final Calendar startDate = Calendar.getInstance();
             startDate.setTimeInMillis(startTime);
@@ -445,7 +446,7 @@ public class JobSchedulerImpl
             while ( result.hasNext() ) {
                 final Resource eventResource = result.next();
                 // sanity check for the path
-                if ( eventResource.getPath().startsWith(this.config.getScheduledJobsPathWithSlash()) ) {
+                if ( eventResource.getPath().startsWith(this.configuration.getScheduledJobsPath(true)) ) {
                     final ReadResult readResult = this.readScheduledJob(eventResource);
                     if ( readResult != null ) {
                         if ( readResult.hasReadErrors ) {
@@ -518,7 +519,7 @@ public class JobSchedulerImpl
             final boolean suspend,
             final List<ScheduleInfoImpl> scheduleInfos)
     throws PersistenceException {
-        final ResourceResolver resolver = this.config.createResourceResolver();
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
         try {
 
             // create properties
@@ -556,7 +557,7 @@ public class JobSchedulerImpl
             // create path and resource
             properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
 
-            final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(scheduleName);
+            final String path = this.configuration.getScheduledJobsPath(true) + ResourceHelper.filterName(scheduleName);
 
             // update existing resource
             final Resource existingInfo = resolver.getResource(path);
@@ -593,13 +594,13 @@ public class JobSchedulerImpl
     }
 
     @Override
-    public void topologyChanged(final TopologyCapabilities caps) {
-        if ( caps == null ) {
+    public void configurationChanged(final boolean active) {
+        if ( !active ) {
             this.active = false;
             this.stopScheduling();
         } else {
             final boolean previouslyActive = this.active;
-            this.active = caps.isLeader();
+            this.active = this.configuration.getTopologyCapabilities().isLeader();
             if ( this.active && !previouslyActive ) {
                 this.startScheduling();
             }
@@ -726,10 +727,9 @@ public class JobSchedulerImpl
     }
 
     public void setSuspended(final ScheduledJobInfoImpl info, final boolean flag) {
-        final ResourceResolver resolver = config.createResourceResolver();
+        final ResourceResolver resolver = configuration.createResourceResolver();
         try {
-            final StringBuilder sb = new StringBuilder(this.config.getScheduledJobsPathWithSlash());
-            sb.append('/');
+            final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true));
             sb.append(ResourceHelper.filterName(info.getName()));
             final String path = sb.toString();
 

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java (from r1632617, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java&r1=1632617&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java Sat Oct 18 15:51:05 2014
@@ -16,16 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs.topology;
+package org.apache.sling.event.impl.jobs.config;
 
 /**
  * Listener interface to get topology / queue changes.
+ * Components interested in configuration changes can subscribe
+ * themselves using the {@link JobManagerConfiguration}.
  */
-public interface TopologyAware {
+public interface ConfigurationChangeListener {
 
     /**
-     * Notify about a change.
-     * @param caps The new topology capabilities or {@code null}
+     * Notify about a configuration change.
+     * @param active {@code true} if job processing is active, otherwise {@code false}
      */
-    void topologyChanged(final TopologyCapabilities caps);
+    void configurationChanged(final boolean active);
 }

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java&r1=1632478&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Sat Oct 18 15:51:05 2014
@@ -16,14 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs;
+package org.apache.sling.event.impl.jobs.config;
 
+import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
@@ -34,9 +38,17 @@ import org.apache.sling.api.resource.Per
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
 import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.tasks.CheckTopologyTask;
+import org.apache.sling.event.impl.jobs.tasks.FindUnfinishedJobsTask;
+import org.apache.sling.event.impl.jobs.tasks.UpgradeTask;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +60,7 @@ import org.slf4j.LoggerFactory;
            label="Apache Sling Job Manager",
            description="This is the central service of the job handling.",
            name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
-@Service(value={JobManagerConfiguration.class})
+@Service(value={JobManagerConfiguration.class, TopologyEventListener.class})
 @Properties({
     @Property(name=JobManagerConfiguration.PROPERTY_DISABLE_DISTRIBUTION,
               boolValue=JobManagerConfiguration.DEFAULT_DISABLE_DISTRIBUTION,
@@ -62,10 +74,10 @@ import org.slf4j.LoggerFactory;
     @Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
               longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
 })
-public class JobManagerConfiguration {
+public class JobManagerConfiguration implements TopologyEventListener, QueueConfigurationChangeListener {
 
     /** Logger. */
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private final Logger logger = LoggerFactory.getLogger("org.apache.sling.event.impl.jobs");
 
     /** Default resource path for jobs. */
     public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
@@ -133,6 +145,9 @@ public class JobManagerConfiguration {
     /** The resource path where scheduled jobs are stored - ending with a slash. */
     private String scheduledJobsPathWithSlash;
 
+    /** List of topology awares. */
+    private final List<ConfigurationChangeListener> listeners = new ArrayList<ConfigurationChangeListener>();
+
     /** The environment component. */
     @Reference
     private EnvironmentComponent environment;
@@ -140,6 +155,12 @@ public class JobManagerConfiguration {
     @Reference
     private ResourceResolverFactory resourceResolverFactory;
 
+    @Reference
+    private QueueConfigurationManager queueConfigManager;
+
+    /** The topology capabilities. */
+    private volatile TopologyCapabilities topologyCapabilities;
+
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -182,18 +203,27 @@ public class JobManagerConfiguration {
         } finally {
             resolver.close();
         }
+        this.queueConfigManager.addListener(this);
     }
 
     /**
      * Update with a new configuration
      */
     @Modified
-    public void update(final Map<String, Object> props) {
+    protected void update(final Map<String, Object> props) {
         this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
         this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
     }
 
     /**
+     * Deactivate
+     */
+    @Deactivate
+    protected void deactivate() {
+        this.queueConfigManager.removeListener(this);
+    }
+
+    /**
      * Create a new resource resolver for reading and writing the resource tree.
      * The resolver needs to be closed by the client.
      * @return A resource resolver
@@ -211,6 +241,26 @@ public class JobManagerConfiguration {
     }
 
     /**
+     * Get the current topology capabilities.
+     * @return The capabilities or {@code null}
+     */
+    public TopologyCapabilities getTopologyCapabilities() {
+        return this.topologyCapabilities;
+    }
+
+    public QueueConfigurationManager getQueueConfigurationManager() {
+        return this.queueConfigManager;
+    }
+
+    /**
+     * Get main logger.
+     * @return The main logger.
+     */
+    public Logger getMainLogger() {
+        return this.logger;
+    }
+
+    /**
      * Get the resource path for all assigned jobs.
      * @return The path - does not end with a slash.
      */
@@ -334,12 +384,13 @@ public class JobManagerConfiguration {
 
     /**
      * Get the storage path for finished jobs.
-     * @param finishedJob The finished job
+     * @param topic Topic of the finished job
+     * @param jobId The job id of the finished job.
      * @param isSuccess Whether processing was successful or not
      * @return The complete storage path
      */
-    public String getStoragePath(final JobImpl finishedJob, final boolean isSuccess) {
-        final String topicName = finishedJob.getTopic().replace('/', '.');
+    public String getStoragePath(final String topic, final String jobId, final boolean isSuccess) {
+        final String topicName = topic.replace('/', '.');
         final StringBuilder sb = new StringBuilder();
         if ( isSuccess ) {
             sb.append(this.storedSuccessfulJobsPath);
@@ -349,7 +400,7 @@ public class JobManagerConfiguration {
         sb.append('/');
         sb.append(topicName);
         sb.append('/');
-        sb.append(finishedJob.getId());
+        sb.append(jobId);
 
         return sb.toString();
 
@@ -362,11 +413,138 @@ public class JobManagerConfiguration {
         return path.startsWith(this.storedCancelledJobsPath) || path.startsWith(this.storedSuccessfulJobsPath);
     }
 
-    public String getScheduledJobsPath() {
-        return this.scheduledJobsPath;
+    /**
+     * Get the scheduled jobs path
+     * @param slash If {@code false} the path is returned, if {@code true} the path appended with a slash is returned.
+     * @return The path for the scheduled jobs
+     */
+    public String getScheduledJobsPath(final boolean slash) {
+        return (slash ? this.scheduledJobsPathWithSlash : this.scheduledJobsPath);
+    }
+
+    @Override
+    public void configChanged() {
+        final TopologyCapabilities caps = this.topologyCapabilities;
+        if ( caps != null ) {
+            synchronized ( this.listeners ) {
+                this.stopProcessing(false);
+
+                this.startProcessing(Type.PROPERTIES_CHANGED, caps, true);
+            }
+        }
+    }
+
+    private void stopProcessing(final boolean deactivate) {
+        boolean notify = this.topologyCapabilities != null;
+        // deactivate old capabilities - this stops all background processes
+        if ( deactivate && this.topologyCapabilities != null ) {
+            this.topologyCapabilities.deactivate();
+        }
+        this.topologyCapabilities = null;
+
+        if ( notify ) {
+            // stop all listeners
+            this.notifiyListeners();
+        }
+    }
+
+    private void startProcessing(final Type eventType, final TopologyCapabilities newCaps, final boolean isConfigChange) {
+        // create new capabilities and update view
+        this.topologyCapabilities = newCaps;
+
+        // before we propagate the new topology we do some maintenance
+        if ( eventType == Type.TOPOLOGY_INIT ) {
+            final UpgradeTask task = new UpgradeTask();
+            task.run(this, this.topologyCapabilities, queueConfigManager);
+
+            final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
+            rt.run(this);
+        }
+
+        final CheckTopologyTask mt = new CheckTopologyTask(this, this.queueConfigManager);
+        mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
+
+        // start listeners
+        this.notifiyListeners();
     }
 
-    public String getScheduledJobsPathWithSlash() {
-        return this.scheduledJobsPathWithSlash;
+    private void notifiyListeners() {
+        for(final ConfigurationChangeListener l : this.listeners) {
+            l.configurationChanged(this.topologyCapabilities != null);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+     */
+    @Override
+    public void handleTopologyEvent(final TopologyEvent event) {
+        this.logger.debug("Received topology event {}", event);
+
+        // check if there is a change of properties which doesn't affect us
+        if ( event.getType() == Type.PROPERTIES_CHANGED ) {
+            final Map<String, String> newAllInstances = TopologyCapabilities.getAllInstancesMap(event.getNewView());
+            if ( this.topologyCapabilities != null && this.topologyCapabilities.isSame(newAllInstances) ) {
+                logger.debug("No changes in capabilities - ignoring event");
+                return;
+            }
+        }
+
+        synchronized ( this.listeners ) {
+
+            if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+               this.stopProcessing(true);
+
+            } else if ( event.getType() == Type.TOPOLOGY_INIT
+                || event.getType() == Type.TOPOLOGY_CHANGED
+                || event.getType() == Type.PROPERTIES_CHANGED ) {
+
+                this.stopProcessing(true);
+
+                this.startProcessing(event.getType(), new TopologyCapabilities(event.getNewView(), this), false);
+            }
+
+        }
+    }
+
+    /**
+     * Add a topology aware listener
+     * @param service Listener to notify about changes.
+     */
+    public void addListener(final ConfigurationChangeListener service) {
+        synchronized ( this.listeners ) {
+            this.listeners.add(service);
+            service.configurationChanged(this.topologyCapabilities != null);
+        }
+    }
+
+    /**
+     * Remove a topology aware listener
+     * @param service Listener to notify about changes.
+     */
+    public void removeListener(final ConfigurationChangeListener service) {
+        synchronized ( this.listeners )  {
+            this.listeners.remove(service);
+        }
+    }
+
+    private final Map<String, Job> retryList = new HashMap<String, Job>();
+
+    public void addJobToRetryList(final Job job) {
+        synchronized ( retryList ) {
+            retryList.put(job.getId(), job);
+        }
+    }
+
+    public void removeJobFromRetryList(final Job job) {
+        synchronized ( retryList ) {
+            retryList.remove(job.getId());
+        }
+    }
+
+    public Job getJobFromRetryList(final String jobId) {
+        synchronized ( retryList ) {
+            return retryList.get(jobId);
+        }
     }
 }

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java&r1=1632478&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java Sat Oct 18 15:51:05 2014
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs.topology;
+package org.apache.sling.event.impl.jobs.config;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,7 +29,6 @@ import java.util.TreeMap;
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.TopologyView;
 import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.jobs.QueueConfiguration;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java Sat Oct 18 15:51:05 2014
@@ -41,7 +41,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Sat Oct 18 15:51:05 2014
@@ -42,7 +42,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.Queue;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java Sat Oct 18 15:51:05 2014
@@ -28,8 +28,8 @@ import org.apache.felix.scr.annotations.
 import org.apache.sling.api.SlingConstants;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.NotificationConstants;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Sat Oct 18 15:51:05 2014
@@ -25,9 +25,13 @@ import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.EventingThreadPool;
@@ -35,11 +39,12 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
+import org.apache.sling.event.impl.support.BatchResourceRemover;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
@@ -151,10 +156,6 @@ public abstract class AbstractJobQueue
         return this.services.statisticsManager.getQueueStatistics(this.queueName);
     }
 
-    public QueueJobCache getCache() {
-        return this.services.cache;
-    }
-
     /**
      * Start the job queue.
      */
@@ -353,7 +354,7 @@ public abstract class AbstractJobQueue
         if ( logger.isDebugEnabled() ) {
             logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
         }
-        return (result != null ? new JobHandler( result, (JobManagerImpl)this.services.jobManager) : null);
+        return (result != null ? new JobHandler( result, this.services.configuration) : null);
     }
 
     /**
@@ -371,8 +372,8 @@ public abstract class AbstractJobQueue
      * Inform the queue about a job for the topic
      * @param topic A new topic.
      */
-    public void wakeUpQueue(final String topic) {
-        this.services.cache.handleNewJob(topic);
+    public void wakeUpQueue(final Set<String> topics) {
+        this.services.cache.handleNewJob(topics);
         this.stopWaitingForNextJob();
     }
 
@@ -847,7 +848,52 @@ public abstract class AbstractJobQueue
      */
     @Override
     public synchronized void removeAll() {
-        this.services.topicManager.removeAll(this.getName());
+        final Set<String> topics = this.services.cache.getTopics();
+        logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
+
+        if ( !topics.isEmpty() ) {
+
+            final ResourceResolver resolver = this.services.configuration.createResourceResolver();
+            try {
+                final Resource baseResource = resolver.getResource(this.services.configuration.getLocalJobsPath());
+
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final BatchResourceRemover brr = new BatchResourceRemover();
+
+                    for(final String t : topics) {
+                        final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
+                        if ( topicResource != null ) {
+                            JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
+
+                                @Override
+                                public boolean handle(final JobImpl job) {
+                                    final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+                                    // sanity check
+                                    if ( jobResource != null ) {
+                                        try {
+                                            brr.delete(jobResource);
+                                        } catch ( final PersistenceException ignore) {
+                                            logger.error("Unable to remove job " + job, ignore);
+                                            topicResource.getResourceResolver().revert();
+                                            topicResource.getResourceResolver().refresh();
+                                        }
+                                    }
+                                    return true;
+                                }
+                            });
+                        }
+                    }
+                    try {
+                        resolver.commit();
+                    } catch ( final PersistenceException ignore) {
+                        logger.error("Unable to remove jobs", ignore);
+                    }
+                }
+            } finally {
+                resolver.close();
+            }
+        }
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Sat Oct 18 15:51:05 2014
@@ -31,8 +31,8 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.jobs.QueueConfiguration.Type;
 import org.slf4j.Logger;
@@ -211,12 +211,12 @@ public class QueueJobCache {
      * Mark the topic to contain new jobs.
      * @param topic The topic
      */
-    public void handleNewJob(final String topic) {
-        logger.debug("Update cache to handle new event for topic {}", topic);
+    public void handleNewJob(final Set<String> topics) {
+        logger.debug("Update cache to handle new event for topics {}", topics);
         synchronized ( this.topicsWithNewJobs ) {
-            this.topicsWithNewJobs.add(topic);
+            this.topicsWithNewJobs.addAll(topics);
         }
-        this.topics.add(topic);
+        this.topics.addAll(topics);
     }
 
     public void reschedule(final JobHandler handler) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Sat Oct 18 15:51:05 2014
@@ -19,7 +19,6 @@
 package org.apache.sling.event.impl.jobs.queues;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -36,8 +35,8 @@ import org.apache.felix.scr.annotations.
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
@@ -45,7 +44,6 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.jmx.QueuesMBean;
@@ -177,15 +175,11 @@ public class QueueManager
      * This method first searches the corresponding queue - if such a queue
      * does not exist yet, it is created and started.
      *
-     * @param topicManager The topic manager
-     * @param jobManager The job manager
      * @param queueInfo The queue info
-     * @param topic The topic
+     * @param topics The topics
      */
-    public void start(final TopicManager topicManager,
-            final JobManager jobManager,
-            final QueueInfo queueInfo,
-            final String topic) {
+    public void start(final QueueInfo queueInfo,
+            final Set<String> topics) {
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
         // get or create queue
         AbstractJobQueue queue = null;
@@ -200,15 +194,12 @@ public class QueueManager
             }
             if ( queue == null ) {
                 final QueueServices services = new QueueServices();
+                services.configuration = this.configuration;
                 services.eventAdmin = this.eventAdmin;
                 services.jobConsumerManager = this.jobConsumerManager;
                 services.scheduler = this.scheduler;
                 services.threadPoolManager = this.threadPoolManager;
-                services.topicManager = topicManager;
                 services.statisticsManager = statisticsManager;
-                services.jobManager = jobManager;
-                final Set<String> topics = new HashSet<String>();
-                topics.add(topic);
                 services.cache = new QueueJobCache(configuration, queueInfo, topics);
                 if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
                     queue = new OrderedJobQueue(queueInfo.queueName, config, services);
@@ -224,7 +215,7 @@ public class QueueManager
                     queue.start();
                 }
             } else {
-                queue.wakeUpQueue(topic);
+                queue.wakeUpQueue(topics);
             }
         }
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Sat Oct 18 15:51:05 2014
@@ -21,12 +21,14 @@ package org.apache.sling.event.impl.jobs
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.jobs.JobManager;
 import org.osgi.service.event.EventAdmin;
 
 public class QueueServices {
 
+    public JobManagerConfiguration configuration;
+
     public JobConsumerManager jobConsumerManager;
 
     public EventAdmin eventAdmin;
@@ -35,11 +37,7 @@ public class QueueServices {
 
     public Scheduler scheduler;
 
-    public TopicManager topicManager;
-
     public StatisticsManager statisticsManager;
 
     public QueueJobCache cache;
-
-    public JobManager jobManager;
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java Sat Oct 18 15:51:05 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.event.impl.jobs.queues;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,19 +32,12 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.TestLogger;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
-import org.apache.sling.event.impl.jobs.topology.TopologyAware;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
-import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
-import org.apache.sling.event.impl.support.BatchResourceRemover;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.osgi.framework.BundleContext;
@@ -60,10 +54,10 @@ import org.slf4j.LoggerFactory;
 @Component(immediate=true)
 @Service(value=EventHandler.class)
 @Property(name=EventConstants.EVENT_TOPIC, value=NotificationConstants.TOPIC_JOB_ADDED)
-public class TopicManager implements EventHandler, TopologyAware {
+public class TopicManager implements EventHandler, ConfigurationChangeListener {
 
     /** Logger. */
-    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Reference
     private JobManagerConfiguration configuration;
@@ -72,17 +66,11 @@ public class TopicManager implements Eve
     private QueueConfigurationManager queueConfigMgr;
 
     @Reference
-    private TopologyHandler topologyHandler;
-
-    @Reference
     private QueueManager queueManager;
 
     @Reference
     private JobManager jobManager;
 
-    /** The mapping from a topic to a queue info. */
-    private final Map<String, QueueInfo> topicMapping = new HashMap<String, QueueInfo>();
-
     /** Flag whether the manager is active or suspended. */
     private final AtomicBoolean isActive = new AtomicBoolean(false);
 
@@ -91,7 +79,7 @@ public class TopicManager implements Eve
      */
     @Activate
     protected void activate(final BundleContext bundleContext) {
-        this.topologyHandler.addListener(this);
+        this.configuration.addListener(this);
     }
 
     /**
@@ -99,7 +87,7 @@ public class TopicManager implements Eve
      */
     @Deactivate
     protected void deactivate() {
-        this.topologyHandler.removeListener(this);
+        this.configuration.removeListener(this);
     }
 
     /**
@@ -107,22 +95,19 @@ public class TopicManager implements Eve
      * @param caps The new topology capabilities or {@code null} if currently unknown.
      */
     @Override
-    public void topologyChanged(final TopologyCapabilities caps) {
-        logger.debug("Topology changed {}", caps);
-        synchronized ( this.topicMapping ) {
-            if ( caps != null ) {
-                final Set<String> topics = this.initialScan();
-                this.updateTopicMapping(topics);
-                // start queues
-                for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet() ) {
-                    this.queueManager.start(this, this.jobManager, entry.getValue(), entry.getKey());
-                }
-                this.isActive.set(true);
-            } else {
-                this.isActive.set(false);
-                this.queueManager.restart();
-                this.topicMapping.clear();
+    public void configurationChanged(final boolean active) {
+        logger.debug("Topology changed {}", active);
+        if ( active ) {
+            final Set<String> topics = this.initialScan();
+            final Map<QueueInfo, Set<String>> mapping = this.updateTopicMapping(topics);
+            // start queues
+            for(final Map.Entry<QueueInfo, Set<String>> entry : mapping.entrySet() ) {
+                this.queueManager.start(entry.getKey(), entry.getValue());
             }
+            this.isActive.set(true);
+        } else {
+            this.isActive.set(false);
+            this.queueManager.restart();
         }
     }
 
@@ -159,89 +144,28 @@ public class TopicManager implements Eve
     @Override
     public void handleEvent(final Event event) {
         final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
-        synchronized ( this.topicMapping ) {
-            if ( this.isActive.get() && topic != null ) {
-                QueueInfo info = this.topicMapping.get(topic);
-                if ( info == null ) {
-                    info = this.queueConfigMgr.getQueueInfo(topic);
-                    this.topicMapping.put(topic, info);
-                }
-                this.queueManager.start(this, this.jobManager, info, topic);
-            }
+        if ( this.isActive.get() && topic != null ) {
+            final QueueInfo info = this.queueConfigMgr.getQueueInfo(topic);
+            this.queueManager.start(info, Collections.singleton(topic));
         }
     }
 
     /**
      * Get the latest mapping from queue name to topics
      */
-    private void updateTopicMapping(final Set<String> topics) {
-        this.topicMapping.clear();
-
+    private Map<QueueInfo, Set<String>> updateTopicMapping(final Set<String> topics) {
+        final Map<QueueInfo, Set<String>> mapping = new HashMap<QueueConfigurationManager.QueueInfo, Set<String>>();
         for(final String topic : topics) {
             final QueueInfo queueInfo = this.queueConfigMgr.getQueueInfo(topic);
-            this.topicMapping.put(topic, queueInfo);
-        }
-
-        this.logger.debug("Established new topic mapping: {}", this.topicMapping);
-    }
-
-    /**
-     * Remove all jobs for a queue.
-     * @param queueName The queue name
-     */
-    public void removeAll(final String queueName) {
-        final Set<String> topics = new HashSet<String>();
-        synchronized ( this.topicMapping ) {
-            for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet()) {
-                if ( entry.getValue().queueName.equals(queueName) ) {
-                    topics.add(entry.getKey());
-                }
+            Set<String> queueTopics = mapping.get(queueInfo);
+            if ( queueTopics == null ) {
+                queueTopics = new HashSet<String>();
+                mapping.put(queueInfo, queueTopics);
             }
+            queueTopics.add(topic);
         }
-        logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
 
-        if ( !topics.isEmpty() ) {
-
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            try {
-                final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
-
-                // sanity check - should never be null
-                if ( baseResource != null ) {
-                    final BatchResourceRemover brr = new BatchResourceRemover();
-
-                    for(final String t : topics) {
-                        final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
-                        if ( topicResource != null ) {
-                            JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
-
-                                @Override
-                                public boolean handle(final JobImpl job) {
-                                    final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
-                                    // sanity check
-                                    if ( jobResource != null ) {
-                                        try {
-                                            brr.delete(jobResource);
-                                        } catch ( final PersistenceException ignore) {
-                                            logger.error("Unable to remove job " + job, ignore);
-                                            topicResource.getResourceResolver().revert();
-                                            topicResource.getResourceResolver().refresh();
-                                        }
-                                    }
-                                    return true;
-                                }
-                            });
-                        }
-                    }
-                    try {
-                        resolver.commit();
-                    } catch ( final PersistenceException ignore) {
-                        logger.error("Unable to remove jobs", ignore);
-                    }
-                }
-            } finally {
-                resolver.close();
-            }
-        }
+        this.logger.debug("Established new topic mapping: {}", mapping);
+        return mapping;
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Sat Oct 18 15:51:05 2014
@@ -27,7 +27,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.jobs.Job;

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java&r1=1632478&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Sat Oct 18 15:51:05 2014
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs.topology;
+package org.apache.sling.event.impl.jobs.tasks;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,9 +30,10 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;