You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/18 17:51:05 UTC
svn commit: r1632809 [1/2] - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/config/
main/java/org/apache/sling/event/impl/jobs/console/
main/java/org/apache/sling/eve...
Author: cziegeler
Date: Sat Oct 18 15:51:05 2014
New Revision: 1632809
URL: http://svn.apache.org/r1632809
Log:
SLING-4048 : Avoid keeping jobs in memory. Another refactoring: move all configuration code into config package (WiP)
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java
- copied, changed from r1632617, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
- copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
- copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
- copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
- copied, changed from r1632528, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
- copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
- copied, changed from r1632520, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/InstanceDescriptionComparatorTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Sat Oct 18 15:51:05 2014
@@ -40,7 +40,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.discovery.PropertyProvider;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.TopicMatcher;
import org.apache.sling.event.impl.support.TopicMatcherHelper;
import org.apache.sling.event.jobs.Job;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Sat Oct 18 15:51:05 2014
@@ -19,6 +19,20 @@
package org.apache.sling.event.impl.jobs;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Queue;
@@ -35,12 +49,12 @@ public class JobHandler {
private volatile boolean isStopped = false;
- private final JobManagerImpl jobManager;
+ private final JobManagerConfiguration configuration;
public JobHandler(final JobImpl job,
- final JobManagerImpl jobManager) {
+ final JobManagerConfiguration configuration) {
this.job = job;
- this.jobManager = jobManager;
+ this.configuration = configuration;
}
public JobImpl getJob() {
@@ -53,39 +67,174 @@ public class JobHandler {
}
/**
- * Finish the processing of the job
- * @param state The state of processing
- */
- public void finished(final Job.JobState state, final boolean keepJobInHistory, final long duration) {
- // for now we just keep cancelled jobs
- this.jobManager.finishJob(this.job, state, keepJobInHistory, duration);
- }
-
- /**
* Reschedule the job
* Update the retry count and remove the started time.
* @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
*/
public boolean reschedule() {
- return this.jobManager.reschedule(this.job);
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+ if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+ mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+ }
+ mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ try {
+ resolver.commit();
+ return true;
+ } catch ( final PersistenceException pe ) {
+ this.configuration.getMainLogger().debug("Unable to update reschedule properties for job " + job.getId(), pe);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+
+ return false;
}
- public void cancel() {
- this.jobManager.finishJob(this.job, Job.JobState.DROPPED, true, -1);
+ /**
+ * Finish a job
+ * @param info The job handler
+ * @param state The state of the processing
+ */
+ public void finished(final Job.JobState state,
+ final boolean keepJobInHistory,
+ final long duration) {
+ final boolean isSuccess = (state == Job.JobState.SUCCEEDED);
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ try {
+ String newPath = null;
+ if ( keepJobInHistory ) {
+ final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+ newPath = this.configuration.getStoragePath(job.getTopic(), job.getId(), isSuccess);
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
+ if ( isSuccess ) {
+ // we set the finish date to start date + duration
+ final Date finishDate = new Date();
+ finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
+ final Calendar finishCal = Calendar.getInstance();
+ finishCal.setTime(finishDate);
+ props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
+ } else {
+ // current time is good enough
+ props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
+ }
+ if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+ props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+ }
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ }
+ resolver.delete(jobResource);
+ resolver.commit();
+
+ if ( keepJobInHistory && configuration.getMainLogger().isDebugEnabled() ) {
+ if ( isSuccess ) {
+ configuration.getMainLogger().debug("Kept successful job {} at {}", Utility.toString(job), newPath);
+ } else {
+ configuration.getMainLogger().debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
+ }
+ }
+ } catch ( final PersistenceException pe ) {
+ this.configuration.getMainLogger().warn("Unable to finish job " + job.getId(), pe);
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
}
/**
* Reassign to a new instance.
*/
public void reassign() {
- this.jobManager.reassign(this.job);
+ final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
+ // Sanity check if queue configuration has changed
+ final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
+ final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+ final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ if ( targetId == null ) {
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ } else {
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ }
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(jobResource);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.configuration.getMainLogger().warn("Unable to reassign job " + job.getId(), pe);
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
}
/**
* Update the property of a job in the resource tree
+ * @param propNames the property names to update
+ * @return {@code true} if the update was successful.
*/
public boolean persistJobProperties(final String... propNames) {
- return this.jobManager.persistJobProperties(this.job, propNames);
+ if ( propNames != null ) {
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ for(final String propName : propNames) {
+ final Object val = job.getProperty(propName);
+ if ( val != null ) {
+ if ( val.getClass().isEnum() ) {
+ mvm.put(propName, val.toString());
+ } else {
+ mvm.put(propName, val);
+ }
+ } else {
+ mvm.remove(propName);
+ }
+ }
+ resolver.commit();
+
+ return true;
+ } else {
+ this.configuration.getMainLogger().debug("No job resource found at {}", job.getResourcePath());
+ }
+ } catch ( final PersistenceException ignore ) {
+ this.configuration.getMainLogger().debug("Unable to persist properties", ignore);
+ } finally {
+ resolver.close();
+ }
+ return false;
+ }
+ return true;
}
public boolean isStopped() {
@@ -96,6 +245,15 @@ public class JobHandler {
this.isStopped = true;
}
+ public void addToRetryList() {
+ this.configuration.addJobToRetryList(this.job);
+
+ }
+
+ public void removeFromRetryList() {
+ this.configuration.removeJobFromRetryList(this.job);
+ }
+
@Override
public int hashCode() {
return this.job.getId().hashCode();
@@ -113,13 +271,4 @@ public class JobHandler {
public String toString() {
return "JobHandler(" + this.job.getId() + ")";
}
-
- public void addToRetryList() {
- this.jobManager.addJobToRetryList(this.job);
-
- }
-
- public void removeFromRetryList() {
- this.jobManager.removeJobFromRetryList(this.job);
- }
}
\ No newline at end of file
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Sat Oct 18 15:51:05 2014
@@ -21,7 +21,6 @@ package org.apache.sling.event.impl.jobs
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -37,7 +36,6 @@ import org.apache.felix.scr.annotations.
import org.apache.jackrabbit.util.ISO9075;
import org.apache.sling.api.SlingConstants;
import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.QuerySyntaxException;
import org.apache.sling.api.resource.Resource;
@@ -46,15 +44,16 @@ import org.apache.sling.api.resource.Val
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
import org.apache.sling.event.impl.jobs.queues.QueueManager;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.impl.jobs.topology.TopologyAware;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
-import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
+import org.apache.sling.event.impl.jobs.tasks.CleanUpTask;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.impl.support.ScheduleInfoImpl;
@@ -94,15 +93,12 @@ import org.slf4j.LoggerFactory;
ResourceHelper.BUNDLE_EVENT_UPDATED})
})
public class JobManagerImpl
- implements JobManager, EventHandler, Runnable, TopologyAware {
+ implements JobManager, EventHandler, Runnable, ConfigurationChangeListener {
/** Default logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
- private TopologyHandler topologyHandler;
-
- @Reference
private EventAdmin eventAdmin;
@Reference
@@ -132,7 +128,7 @@ public class JobManagerImpl
private volatile TopologyCapabilities topologyCapabilities;
- private MaintenanceTask maintenanceTask;
+ private CleanUpTask maintenanceTask;
/** We count the scheduler runs. */
private volatile long schedulerRuns;
@@ -147,9 +143,9 @@ public class JobManagerImpl
@Activate
protected void activate(final Map<String, Object> props) throws LoginException {
this.jobScheduler = new JobSchedulerImpl(this.configuration, this.scheduler, this);
- this.maintenanceTask = new MaintenanceTask(this.configuration);
+ this.maintenanceTask = new CleanUpTask(this.configuration);
- this.topologyHandler.addListener(this);
+ this.configuration.addListener(this);
logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID);
}
@@ -159,7 +155,7 @@ public class JobManagerImpl
@Deactivate
protected void deactivate() {
logger.info("Apache Sling Job Manager stopping on instance {}", Environment.APPLICATION_ID);
- this.topologyHandler.removeListener(this);
+ this.configuration.removeListener(this);
this.jobScheduler.deactivate();
@@ -178,7 +174,7 @@ public class JobManagerImpl
logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns);
// invoke maintenance task
- final MaintenanceTask task = this.maintenanceTask;
+ final CleanUpTask task = this.maintenanceTask;
if ( task != null ) {
task.run(this.topologyCapabilities, this.schedulerRuns - 1);
}
@@ -243,23 +239,13 @@ public class JobManagerImpl
}
}
- private void stopProcessing() {
- this.topologyCapabilities = null;
- }
-
- private void startProcessing(final TopologyCapabilities caps) {
- // create new capabilities and update view
- this.topologyCapabilities = caps;
- }
-
@Override
- public void topologyChanged(final TopologyCapabilities caps) {
- if ( caps == null ) {
- this.stopProcessing();
+ public void configurationChanged(final boolean active) {
+ if ( !active ) {
+ this.topologyCapabilities = null;
} else {
- this.startProcessing(caps);
+ this.topologyCapabilities = this.configuration.getTopologyCapabilities();
}
- this.jobScheduler.topologyChanged(caps);
}
/**
@@ -384,7 +370,7 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Found removal job: {}", Utility.toString(job));
}
- final JobImpl retryJob = this.getJobFromRetryList(jobId);
+ final JobImpl retryJob = (JobImpl)this.configuration.getJobFromRetryList(jobId);
if ( retryJob != null ) {
job = retryJob;
}
@@ -416,7 +402,8 @@ public class JobManagerImpl
resolver.close();
}
} else {
- this.finishJob(job, Job.JobState.DROPPED, true, -1);
+ final JobHandler jh = new JobHandler(job, this.configuration);
+ jh.finished(Job.JobState.DROPPED, true, -1);
}
}
} else {
@@ -736,64 +723,7 @@ public class JobManagerImpl
return result;
}
- /**
- * Finish a job
- * @param info The job handler
- * @param state The state of the processing
- */
- public void finishJob(final JobImpl job,
- final Job.JobState state,
- final boolean keepJobInHistory,
- final long duration) {
- final boolean isSuccess = (state == Job.JobState.SUCCEEDED);
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- try {
- String newPath = null;
- if ( keepJobInHistory ) {
- final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- newPath = this.configuration.getStoragePath(job, isSuccess);
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
- if ( isSuccess ) {
- // we set the finish date to start date + duration
- final Date finishDate = new Date();
- finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
- final Calendar finishCal = Calendar.getInstance();
- finishCal.setTime(finishDate);
- props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
- } else {
- // current time is good enough
- props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
- }
- if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
- props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
- }
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- }
- resolver.delete(jobResource);
- resolver.commit();
- if ( keepJobInHistory && logger.isDebugEnabled() ) {
- if ( isSuccess ) {
- logger.debug("Kept successful job {} at {}", Utility.toString(job), newPath);
- } else {
- logger.debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
- }
- }
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- }
- }
- } finally {
- resolver.close();
- }
- }
/**
* Try to get a "lock" for a resource
@@ -1105,144 +1035,4 @@ public class JobManagerImpl
}
return null;
}
-
- private final List<JobImpl> retryList = new ArrayList<JobImpl>();
-
- public void addJobToRetryList(final JobImpl job) {
- synchronized ( retryList ) {
- retryList.add(job);
- }
- }
-
- public void removeJobFromRetryList(final JobImpl job) {
- synchronized ( retryList ) {
- retryList.remove(job);
- }
- }
-
- private JobImpl getJobFromRetryList(final String jobId) {
- synchronized ( retryList ) {
- for(final JobImpl j : retryList) {
- if ( jobId.equals(j.getId())) {
- return j;
- }
- }
- }
- return null;
- }
-
- /**
- * Reassign a job to a new instance
- */
- public void reassign(final JobImpl job) {
- final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
- // Sanity check if queue configuration has changed
- final TopologyCapabilities caps = this.topologyCapabilities;
- final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
-
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- try {
- final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
-
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- if ( targetId == null ) {
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- } else {
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- }
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(jobResource);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- }
- }
- } finally {
- resolver.close();
- }
- }
-
- /**
- * Reschedule the job
- * Update the retry count and remove the started time.
- * @param job The job
- * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
- */
- public boolean reschedule(final JobImpl job) {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
- if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
- mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
- }
- mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
- try {
- resolver.commit();
- return true;
- } catch ( final PersistenceException pe ) {
- this.logger.debug("Unable to update reschedule properties for job " + job.getId(), pe);
- }
- }
- } finally {
- resolver.close();
- }
-
- return false;
- }
-
- /**
- * Update the property of a job in the resource tree
- * @param job The job
- * @param propNames the property names to update
- * @return {@code true} if the update was successful.
- */
- public boolean persistJobProperties(final JobImpl job, final String... propNames) {
- if ( propNames != null ) {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- for(final String propName : propNames) {
- final Object val = job.getProperty(propName);
- if ( val != null ) {
- if ( val.getClass().isEnum() ) {
- mvm.put(propName, val.toString());
- } else {
- mvm.put(propName, val);
- }
- } else {
- mvm.remove(propName);
- }
- }
- resolver.commit();
-
- return true;
- } else {
- logger.debug("No job resource found at {}", job.getResourcePath());
- }
- } catch ( final PersistenceException ignore ) {
- logger.debug("Unable to persist properties", ignore);
- } finally {
- resolver.close();
- }
- return false;
- }
- return true;
- }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Sat Oct 18 15:51:05 2014
@@ -46,8 +46,8 @@ import org.apache.sling.api.resource.Val
import org.apache.sling.commons.scheduler.JobContext;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
-import org.apache.sling.event.impl.jobs.topology.TopologyAware;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.impl.support.ScheduleInfoImpl;
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
*
*/
public class JobSchedulerImpl
- implements EventHandler, TopologyAware, org.apache.sling.commons.scheduler.Job {
+ implements EventHandler, ConfigurationChangeListener, org.apache.sling.commons.scheduler.Job {
private static final String TOPIC_READ_JOB = "org/apache/sling/event/impl/jobs/READSCHEDULEDJOB";
@@ -85,7 +85,7 @@ public class JobSchedulerImpl
/** Is this active? */
private volatile boolean active;
- private final JobManagerConfiguration config;
+ private final JobManagerConfiguration configuration;
private final Scheduler scheduler;
@@ -102,7 +102,7 @@ public class JobSchedulerImpl
public JobSchedulerImpl(final JobManagerConfiguration configuration,
final Scheduler scheduler,
final JobManagerImpl jobManager) {
- this.config = configuration;
+ this.configuration = configuration;
this.scheduler = scheduler;
this.running = true;
this.jobManager = jobManager;
@@ -121,12 +121,14 @@ public class JobSchedulerImpl
}
});
backgroundThread.start();
+ this.configuration.addListener(this);
}
/**
* Deactivate this component.
*/
public void deactivate() {
+ this.configuration.removeListener(this);
this.running = false;
this.stopScheduling();
synchronized ( this.scheduledJobs ) {
@@ -193,7 +195,7 @@ public class JobSchedulerImpl
if ( event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_ADDED)
|| event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_CHANGED)) {
final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
- final ResourceResolver resolver = this.config.createResourceResolver();;
+ final ResourceResolver resolver = this.configuration.createResourceResolver();;
try {
final Resource eventResource = resolver.getResource(path);
if ( ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(eventResource.getResourceType()) ) {
@@ -330,10 +332,9 @@ public class JobSchedulerImpl
}
public void unschedule(final ScheduledJobInfoImpl info) {
- final ResourceResolver resolver = this.config.createResourceResolver();
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
- final StringBuilder sb = new StringBuilder(this.config.getScheduledJobsPathWithSlash());
- sb.append('/');
+ final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true));
sb.append(ResourceHelper.filterName(info.getName()));
final String path = sb.toString();
@@ -371,7 +372,7 @@ public class JobSchedulerImpl
@Override
public void run() {
synchronized (unloadedEvents) {
- final ResourceResolver resolver = config.createResourceResolver();
+ final ResourceResolver resolver = configuration.createResourceResolver();
final Set<String> newUnloadedEvents = new HashSet<String>();
newUnloadedEvents.addAll(unloadedEvents);
try {
@@ -406,7 +407,7 @@ public class JobSchedulerImpl
} else {
final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
final String resourceType = (String)event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
- if ( path != null && path.startsWith(this.config.getScheduledJobsPathWithSlash())
+ if ( path != null && path.startsWith(this.configuration.getScheduledJobsPath(true))
&& (resourceType == null || ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(resourceType))) {
logger.debug("Received resource event for {} : {}", path, resourceType);
try {
@@ -424,7 +425,7 @@ public class JobSchedulerImpl
* Load all scheduled jobs from the resource tree
*/
private void loadScheduledJobs(final long startTime) {
- final ResourceResolver resolver = this.config.createResourceResolver();
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
final Calendar startDate = Calendar.getInstance();
startDate.setTimeInMillis(startTime);
@@ -445,7 +446,7 @@ public class JobSchedulerImpl
while ( result.hasNext() ) {
final Resource eventResource = result.next();
// sanity check for the path
- if ( eventResource.getPath().startsWith(this.config.getScheduledJobsPathWithSlash()) ) {
+ if ( eventResource.getPath().startsWith(this.configuration.getScheduledJobsPath(true)) ) {
final ReadResult readResult = this.readScheduledJob(eventResource);
if ( readResult != null ) {
if ( readResult.hasReadErrors ) {
@@ -518,7 +519,7 @@ public class JobSchedulerImpl
final boolean suspend,
final List<ScheduleInfoImpl> scheduleInfos)
throws PersistenceException {
- final ResourceResolver resolver = this.config.createResourceResolver();
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
// create properties
@@ -556,7 +557,7 @@ public class JobSchedulerImpl
// create path and resource
properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
- final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(scheduleName);
+ final String path = this.configuration.getScheduledJobsPath(true) + ResourceHelper.filterName(scheduleName);
// update existing resource
final Resource existingInfo = resolver.getResource(path);
@@ -593,13 +594,13 @@ public class JobSchedulerImpl
}
@Override
- public void topologyChanged(final TopologyCapabilities caps) {
- if ( caps == null ) {
+ public void configurationChanged(final boolean active) {
+ if ( !active ) {
this.active = false;
this.stopScheduling();
} else {
final boolean previouslyActive = this.active;
- this.active = caps.isLeader();
+ this.active = this.configuration.getTopologyCapabilities().isLeader();
if ( this.active && !previouslyActive ) {
this.startScheduling();
}
@@ -726,10 +727,9 @@ public class JobSchedulerImpl
}
public void setSuspended(final ScheduledJobInfoImpl info, final boolean flag) {
- final ResourceResolver resolver = config.createResourceResolver();
+ final ResourceResolver resolver = configuration.createResourceResolver();
try {
- final StringBuilder sb = new StringBuilder(this.config.getScheduledJobsPathWithSlash());
- sb.append('/');
+ final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true));
sb.append(ResourceHelper.filterName(info.getName()));
final String path = sb.toString();
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java (from r1632617, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java&r1=1632617&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationChangeListener.java Sat Oct 18 15:51:05 2014
@@ -16,16 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs.topology;
+package org.apache.sling.event.impl.jobs.config;
/**
* Listener interface to get topology / queue changes.
+ * Components interested in configuration changes can subscribe
+ * themselves using the {@link JobManagerConfiguration}.
*/
-public interface TopologyAware {
+public interface ConfigurationChangeListener {
/**
- * Notify about a change.
- * @param caps The new topology capabilities or {@code null}
+ * Notify about a configuration change.
+ * @param active {@code true} if job processing is active, otherwise {@code false}
*/
- void topologyChanged(final TopologyCapabilities caps);
+ void configurationChanged(final boolean active);
}
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java&r1=1632478&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Sat Oct 18 15:51:05 2014
@@ -16,14 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs;
+package org.apache.sling.event.impl.jobs.config;
+import java.util.ArrayList;
import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
@@ -34,9 +38,17 @@ import org.apache.sling.api.resource.Per
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.tasks.CheckTopologyTask;
+import org.apache.sling.event.impl.jobs.tasks.FindUnfinishedJobsTask;
+import org.apache.sling.event.impl.jobs.tasks.UpgradeTask;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +60,7 @@ import org.slf4j.LoggerFactory;
label="Apache Sling Job Manager",
description="This is the central service of the job handling.",
name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
-@Service(value={JobManagerConfiguration.class})
+@Service(value={JobManagerConfiguration.class, TopologyEventListener.class})
@Properties({
@Property(name=JobManagerConfiguration.PROPERTY_DISABLE_DISTRIBUTION,
boolValue=JobManagerConfiguration.DEFAULT_DISABLE_DISTRIBUTION,
@@ -62,10 +74,10 @@ import org.slf4j.LoggerFactory;
@Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
})
-public class JobManagerConfiguration {
+public class JobManagerConfiguration implements TopologyEventListener, QueueConfigurationChangeListener {
/** Logger. */
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private final Logger logger = LoggerFactory.getLogger("org.apache.sling.event.impl.jobs");
/** Default resource path for jobs. */
public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
@@ -133,6 +145,9 @@ public class JobManagerConfiguration {
/** The resource path where scheduled jobs are stored - ending with a slash. */
private String scheduledJobsPathWithSlash;
+ /** List of topology awares. */
+ private final List<ConfigurationChangeListener> listeners = new ArrayList<ConfigurationChangeListener>();
+
/** The environment component. */
@Reference
private EnvironmentComponent environment;
@@ -140,6 +155,12 @@ public class JobManagerConfiguration {
@Reference
private ResourceResolverFactory resourceResolverFactory;
+ @Reference
+ private QueueConfigurationManager queueConfigManager;
+
+ /** The topology capabilities. */
+ private volatile TopologyCapabilities topologyCapabilities;
+
/**
* Activate this component.
* @param props Configuration properties
@@ -182,18 +203,27 @@ public class JobManagerConfiguration {
} finally {
resolver.close();
}
+ this.queueConfigManager.addListener(this);
}
/**
* Update with a new configuration
*/
@Modified
- public void update(final Map<String, Object> props) {
+ protected void update(final Map<String, Object> props) {
this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
}
/**
+ * Deactivate
+ */
+ @Deactivate
+ protected void deactivate() {
+ this.queueConfigManager.removeListener(this);
+ }
+
+ /**
* Create a new resource resolver for reading and writing the resource tree.
* The resolver needs to be closed by the client.
* @return A resource resolver
@@ -211,6 +241,26 @@ public class JobManagerConfiguration {
}
/**
+ * Get the current topology capabilities.
+ * @return The capabilities or {@code null}
+ */
+ public TopologyCapabilities getTopologyCapabilities() {
+ return this.topologyCapabilities;
+ }
+
+ public QueueConfigurationManager getQueueConfigurationManager() {
+ return this.queueConfigManager;
+ }
+
+ /**
+ * Get main logger.
+ * @return The main logger.
+ */
+ public Logger getMainLogger() {
+ return this.logger;
+ }
+
+ /**
* Get the resource path for all assigned jobs.
* @return The path - does not end with a slash.
*/
@@ -334,12 +384,13 @@ public class JobManagerConfiguration {
/**
* Get the storage path for finished jobs.
- * @param finishedJob The finished job
+ * @param topic Topic of the finished job
+ * @param jobId The job id of the finished job.
* @param isSuccess Whether processing was successful or not
* @return The complete storage path
*/
- public String getStoragePath(final JobImpl finishedJob, final boolean isSuccess) {
- final String topicName = finishedJob.getTopic().replace('/', '.');
+ public String getStoragePath(final String topic, final String jobId, final boolean isSuccess) {
+ final String topicName = topic.replace('/', '.');
final StringBuilder sb = new StringBuilder();
if ( isSuccess ) {
sb.append(this.storedSuccessfulJobsPath);
@@ -349,7 +400,7 @@ public class JobManagerConfiguration {
sb.append('/');
sb.append(topicName);
sb.append('/');
- sb.append(finishedJob.getId());
+ sb.append(jobId);
return sb.toString();
@@ -362,11 +413,138 @@ public class JobManagerConfiguration {
return path.startsWith(this.storedCancelledJobsPath) || path.startsWith(this.storedSuccessfulJobsPath);
}
- public String getScheduledJobsPath() {
- return this.scheduledJobsPath;
+ /**
+ * Get the scheduled jobs path
+ * @param slash If {@code false} the path is returned, if {@code true} the path appended with a slash is returned.
+ * @return The path for the scheduled jobs
+ */
+ public String getScheduledJobsPath(final boolean slash) {
+ return (slash ? this.scheduledJobsPathWithSlash : this.scheduledJobsPath);
+ }
+
+ @Override
+ public void configChanged() {
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ if ( caps != null ) {
+ synchronized ( this.listeners ) {
+ this.stopProcessing(false);
+
+ this.startProcessing(Type.PROPERTIES_CHANGED, caps, true);
+ }
+ }
+ }
+
+ private void stopProcessing(final boolean deactivate) {
+ boolean notify = this.topologyCapabilities != null;
+ // deactivate old capabilities - this stops all background processes
+ if ( deactivate && this.topologyCapabilities != null ) {
+ this.topologyCapabilities.deactivate();
+ }
+ this.topologyCapabilities = null;
+
+ if ( notify ) {
+ // stop all listeners
+ this.notifiyListeners();
+ }
+ }
+
+ private void startProcessing(final Type eventType, final TopologyCapabilities newCaps, final boolean isConfigChange) {
+ // create new capabilities and update view
+ this.topologyCapabilities = newCaps;
+
+ // before we propagate the new topology we do some maintenance
+ if ( eventType == Type.TOPOLOGY_INIT ) {
+ final UpgradeTask task = new UpgradeTask();
+ task.run(this, this.topologyCapabilities, queueConfigManager);
+
+ final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
+ rt.run(this);
+ }
+
+ final CheckTopologyTask mt = new CheckTopologyTask(this, this.queueConfigManager);
+ mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
+
+ // start listeners
+ this.notifiyListeners();
}
- public String getScheduledJobsPathWithSlash() {
- return this.scheduledJobsPathWithSlash;
+ private void notifiyListeners() {
+ for(final ConfigurationChangeListener l : this.listeners) {
+ l.configurationChanged(this.topologyCapabilities != null);
+ }
+ }
+
+ /**
+ * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+ */
+ @Override
+ public void handleTopologyEvent(final TopologyEvent event) {
+ this.logger.debug("Received topology event {}", event);
+
+ // check if there is a change of properties which doesn't affect us
+ if ( event.getType() == Type.PROPERTIES_CHANGED ) {
+ final Map<String, String> newAllInstances = TopologyCapabilities.getAllInstancesMap(event.getNewView());
+ if ( this.topologyCapabilities != null && this.topologyCapabilities.isSame(newAllInstances) ) {
+ logger.debug("No changes in capabilities - ignoring event");
+ return;
+ }
+ }
+
+ synchronized ( this.listeners ) {
+
+ if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+ this.stopProcessing(true);
+
+ } else if ( event.getType() == Type.TOPOLOGY_INIT
+ || event.getType() == Type.TOPOLOGY_CHANGED
+ || event.getType() == Type.PROPERTIES_CHANGED ) {
+
+ this.stopProcessing(true);
+
+ this.startProcessing(event.getType(), new TopologyCapabilities(event.getNewView(), this), false);
+ }
+
+ }
+ }
+
+ /**
+ * Add a topology aware listener
+ * @param service Listener to notify about changes.
+ */
+ public void addListener(final ConfigurationChangeListener service) {
+ synchronized ( this.listeners ) {
+ this.listeners.add(service);
+ service.configurationChanged(this.topologyCapabilities != null);
+ }
+ }
+
+ /**
+ * Remove a topology aware listener
+ * @param service Listener to notify about changes.
+ */
+ public void removeListener(final ConfigurationChangeListener service) {
+ synchronized ( this.listeners ) {
+ this.listeners.remove(service);
+ }
+ }
+
+ private final Map<String, Job> retryList = new HashMap<String, Job>();
+
+ public void addJobToRetryList(final Job job) {
+ synchronized ( retryList ) {
+ retryList.put(job.getId(), job);
+ }
+ }
+
+ public void removeJobFromRetryList(final Job job) {
+ synchronized ( retryList ) {
+ retryList.remove(job.getId());
+ }
+ }
+
+ public Job getJobFromRetryList(final String jobId) {
+ synchronized ( retryList ) {
+ return retryList.get(jobId);
+ }
}
}
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java&r1=1632478&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java Sat Oct 18 15:51:05 2014
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs.topology;
+package org.apache.sling.event.impl.jobs.config;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,7 +29,6 @@ import java.util.TreeMap;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyView;
import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.QueueConfiguration;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java Sat Oct 18 15:51:05 2014
@@ -41,7 +41,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobManagerImpl;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Sat Oct 18 15:51:05 2014
@@ -42,7 +42,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobManagerImpl;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.Queue;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java Sat Oct 18 15:51:05 2014
@@ -28,8 +28,8 @@ import org.apache.felix.scr.annotations.
import org.apache.sling.api.SlingConstants;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.NotificationConstants;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Sat Oct 18 15:51:05 2014
@@ -25,9 +25,13 @@ import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.EventingThreadPool;
@@ -35,11 +39,12 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
+import org.apache.sling.event.impl.support.BatchResourceRemover;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
@@ -151,10 +156,6 @@ public abstract class AbstractJobQueue
return this.services.statisticsManager.getQueueStatistics(this.queueName);
}
- public QueueJobCache getCache() {
- return this.services.cache;
- }
-
/**
* Start the job queue.
*/
@@ -353,7 +354,7 @@ public abstract class AbstractJobQueue
if ( logger.isDebugEnabled() ) {
logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
}
- return (result != null ? new JobHandler( result, (JobManagerImpl)this.services.jobManager) : null);
+ return (result != null ? new JobHandler( result, this.services.configuration) : null);
}
/**
@@ -371,8 +372,8 @@ public abstract class AbstractJobQueue
* Inform the queue about a job for the topic
* @param topic A new topic.
*/
- public void wakeUpQueue(final String topic) {
- this.services.cache.handleNewJob(topic);
+ public void wakeUpQueue(final Set<String> topics) {
+ this.services.cache.handleNewJob(topics);
this.stopWaitingForNextJob();
}
@@ -847,7 +848,52 @@ public abstract class AbstractJobQueue
*/
@Override
public synchronized void removeAll() {
- this.services.topicManager.removeAll(this.getName());
+ final Set<String> topics = this.services.cache.getTopics();
+ logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
+
+ if ( !topics.isEmpty() ) {
+
+ final ResourceResolver resolver = this.services.configuration.createResourceResolver();
+ try {
+ final Resource baseResource = resolver.getResource(this.services.configuration.getLocalJobsPath());
+
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final BatchResourceRemover brr = new BatchResourceRemover();
+
+ for(final String t : topics) {
+ final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
+ if ( topicResource != null ) {
+ JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
+
+ @Override
+ public boolean handle(final JobImpl job) {
+ final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+ // sanity check
+ if ( jobResource != null ) {
+ try {
+ brr.delete(jobResource);
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to remove job " + job, ignore);
+ topicResource.getResourceResolver().revert();
+ topicResource.getResourceResolver().refresh();
+ }
+ }
+ return true;
+ }
+ });
+ }
+ }
+ try {
+ resolver.commit();
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to remove jobs", ignore);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Sat Oct 18 15:51:05 2014
@@ -31,8 +31,8 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.jobs.QueueConfiguration.Type;
import org.slf4j.Logger;
@@ -211,12 +211,12 @@ public class QueueJobCache {
* Mark the topic to contain new jobs.
* @param topic The topic
*/
- public void handleNewJob(final String topic) {
- logger.debug("Update cache to handle new event for topic {}", topic);
+ public void handleNewJob(final Set<String> topics) {
+ logger.debug("Update cache to handle new event for topics {}", topics);
synchronized ( this.topicsWithNewJobs ) {
- this.topicsWithNewJobs.add(topic);
+ this.topicsWithNewJobs.addAll(topics);
}
- this.topics.add(topic);
+ this.topics.addAll(topics);
}
public void reschedule(final JobHandler handler) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Sat Oct 18 15:51:05 2014
@@ -19,7 +19,6 @@
package org.apache.sling.event.impl.jobs.queues;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -36,8 +35,8 @@ import org.apache.felix.scr.annotations.
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
@@ -45,7 +44,6 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
@@ -177,15 +175,11 @@ public class QueueManager
* This method first searches the corresponding queue - if such a queue
* does not exist yet, it is created and started.
*
- * @param topicManager The topic manager
- * @param jobManager The job manager
* @param queueInfo The queue info
- * @param topic The topic
+ * @param topics The topics
*/
- public void start(final TopicManager topicManager,
- final JobManager jobManager,
- final QueueInfo queueInfo,
- final String topic) {
+ public void start(final QueueInfo queueInfo,
+ final Set<String> topics) {
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// get or create queue
AbstractJobQueue queue = null;
@@ -200,15 +194,12 @@ public class QueueManager
}
if ( queue == null ) {
final QueueServices services = new QueueServices();
+ services.configuration = this.configuration;
services.eventAdmin = this.eventAdmin;
services.jobConsumerManager = this.jobConsumerManager;
services.scheduler = this.scheduler;
services.threadPoolManager = this.threadPoolManager;
- services.topicManager = topicManager;
services.statisticsManager = statisticsManager;
- services.jobManager = jobManager;
- final Set<String> topics = new HashSet<String>();
- topics.add(topic);
services.cache = new QueueJobCache(configuration, queueInfo, topics);
if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
queue = new OrderedJobQueue(queueInfo.queueName, config, services);
@@ -224,7 +215,7 @@ public class QueueManager
queue.start();
}
} else {
- queue.wakeUpQueue(topic);
+ queue.wakeUpQueue(topics);
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Sat Oct 18 15:51:05 2014
@@ -21,12 +21,14 @@ package org.apache.sling.event.impl.jobs
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.jobs.JobManager;
import org.osgi.service.event.EventAdmin;
public class QueueServices {
+ public JobManagerConfiguration configuration;
+
public JobConsumerManager jobConsumerManager;
public EventAdmin eventAdmin;
@@ -35,11 +37,7 @@ public class QueueServices {
public Scheduler scheduler;
- public TopicManager topicManager;
-
public StatisticsManager statisticsManager;
public QueueJobCache cache;
-
- public JobManager jobManager;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java Sat Oct 18 15:51:05 2014
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.impl.jobs.queues;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,19 +32,12 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.TestLogger;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
-import org.apache.sling.event.impl.jobs.topology.TopologyAware;
-import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
-import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
-import org.apache.sling.event.impl.support.BatchResourceRemover;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.NotificationConstants;
import org.osgi.framework.BundleContext;
@@ -60,10 +54,10 @@ import org.slf4j.LoggerFactory;
@Component(immediate=true)
@Service(value=EventHandler.class)
@Property(name=EventConstants.EVENT_TOPIC, value=NotificationConstants.TOPIC_JOB_ADDED)
-public class TopicManager implements EventHandler, TopologyAware {
+public class TopicManager implements EventHandler, ConfigurationChangeListener {
/** Logger. */
- private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
private JobManagerConfiguration configuration;
@@ -72,17 +66,11 @@ public class TopicManager implements Eve
private QueueConfigurationManager queueConfigMgr;
@Reference
- private TopologyHandler topologyHandler;
-
- @Reference
private QueueManager queueManager;
@Reference
private JobManager jobManager;
- /** The mapping from a topic to a queue info. */
- private final Map<String, QueueInfo> topicMapping = new HashMap<String, QueueInfo>();
-
/** Flag whether the manager is active or suspended. */
private final AtomicBoolean isActive = new AtomicBoolean(false);
@@ -91,7 +79,7 @@ public class TopicManager implements Eve
*/
@Activate
protected void activate(final BundleContext bundleContext) {
- this.topologyHandler.addListener(this);
+ this.configuration.addListener(this);
}
/**
@@ -99,7 +87,7 @@ public class TopicManager implements Eve
*/
@Deactivate
protected void deactivate() {
- this.topologyHandler.removeListener(this);
+ this.configuration.removeListener(this);
}
/**
@@ -107,22 +95,19 @@ public class TopicManager implements Eve
* @param caps The new topology capabilities or {@code null} if currently unknown.
*/
@Override
- public void topologyChanged(final TopologyCapabilities caps) {
- logger.debug("Topology changed {}", caps);
- synchronized ( this.topicMapping ) {
- if ( caps != null ) {
- final Set<String> topics = this.initialScan();
- this.updateTopicMapping(topics);
- // start queues
- for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet() ) {
- this.queueManager.start(this, this.jobManager, entry.getValue(), entry.getKey());
- }
- this.isActive.set(true);
- } else {
- this.isActive.set(false);
- this.queueManager.restart();
- this.topicMapping.clear();
+ public void configurationChanged(final boolean active) {
+ logger.debug("Topology changed {}", active);
+ if ( active ) {
+ final Set<String> topics = this.initialScan();
+ final Map<QueueInfo, Set<String>> mapping = this.updateTopicMapping(topics);
+ // start queues
+ for(final Map.Entry<QueueInfo, Set<String>> entry : mapping.entrySet() ) {
+ this.queueManager.start(entry.getKey(), entry.getValue());
}
+ this.isActive.set(true);
+ } else {
+ this.isActive.set(false);
+ this.queueManager.restart();
}
}
@@ -159,89 +144,28 @@ public class TopicManager implements Eve
@Override
public void handleEvent(final Event event) {
final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
- synchronized ( this.topicMapping ) {
- if ( this.isActive.get() && topic != null ) {
- QueueInfo info = this.topicMapping.get(topic);
- if ( info == null ) {
- info = this.queueConfigMgr.getQueueInfo(topic);
- this.topicMapping.put(topic, info);
- }
- this.queueManager.start(this, this.jobManager, info, topic);
- }
+ if ( this.isActive.get() && topic != null ) {
+ final QueueInfo info = this.queueConfigMgr.getQueueInfo(topic);
+ this.queueManager.start(info, Collections.singleton(topic));
}
}
/**
* Get the latest mapping from queue name to topics
*/
- private void updateTopicMapping(final Set<String> topics) {
- this.topicMapping.clear();
-
+ private Map<QueueInfo, Set<String>> updateTopicMapping(final Set<String> topics) {
+ final Map<QueueInfo, Set<String>> mapping = new HashMap<QueueConfigurationManager.QueueInfo, Set<String>>();
for(final String topic : topics) {
final QueueInfo queueInfo = this.queueConfigMgr.getQueueInfo(topic);
- this.topicMapping.put(topic, queueInfo);
- }
-
- this.logger.debug("Established new topic mapping: {}", this.topicMapping);
- }
-
- /**
- * Remove all jobs for a queue.
- * @param queueName The queue name
- */
- public void removeAll(final String queueName) {
- final Set<String> topics = new HashSet<String>();
- synchronized ( this.topicMapping ) {
- for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet()) {
- if ( entry.getValue().queueName.equals(queueName) ) {
- topics.add(entry.getKey());
- }
+ Set<String> queueTopics = mapping.get(queueInfo);
+ if ( queueTopics == null ) {
+ queueTopics = new HashSet<String>();
+ mapping.put(queueInfo, queueTopics);
}
+ queueTopics.add(topic);
}
- logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
- if ( !topics.isEmpty() ) {
-
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
-
- // sanity check - should never be null
- if ( baseResource != null ) {
- final BatchResourceRemover brr = new BatchResourceRemover();
-
- for(final String t : topics) {
- final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
- if ( topicResource != null ) {
- JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
-
- @Override
- public boolean handle(final JobImpl job) {
- final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
- // sanity check
- if ( jobResource != null ) {
- try {
- brr.delete(jobResource);
- } catch ( final PersistenceException ignore) {
- logger.error("Unable to remove job " + job, ignore);
- topicResource.getResourceResolver().revert();
- topicResource.getResourceResolver().refresh();
- }
- }
- return true;
- }
- });
- }
- }
- try {
- resolver.commit();
- } catch ( final PersistenceException ignore) {
- logger.error("Unable to remove jobs", ignore);
- }
- }
- } finally {
- resolver.close();
- }
- }
+ this.logger.debug("Established new topic mapping: {}", mapping);
+ return mapping;
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632809&r1=1632808&r2=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Sat Oct 18 15:51:05 2014
@@ -27,7 +27,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.jobs.Job;
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java&r1=1632478&r2=1632809&rev=1632809&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Sat Oct 18 15:51:05 2014
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs.topology;
+package org.apache.sling.event.impl.jobs.tasks;
import java.util.Collections;
import java.util.HashMap;
@@ -30,9 +30,10 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;