You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 08:52:39 UTC
svn commit: r1632217 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/queues/
main/java/org/apache/sling/event/impl/jobs/tasks/
main/java/org/apache/sling/event/impl/...
Author: cziegeler
Date: Thu Oct 16 06:52:39 2014
New Revision: 1632217
URL: http://svn.apache.org/r1632217
Log:
SLING-4048 : Avoid keeping jobs in memory. Refactor job traversal and implement different queue strategies (WiP)
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java
- copied, changed from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
- copied, changed from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
- copied, changed from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java (from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java&r1=1632213&r2=1632217&rev=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java Thu Oct 16 06:52:39 2014
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs.topics;
+package org.apache.sling.event.impl.jobs;
import java.util.ArrayList;
import java.util.Collections;
@@ -24,20 +24,68 @@ import java.util.Iterator;
import java.util.List;
import org.apache.sling.api.resource.Resource;
-import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.Utility;
import org.slf4j.Logger;
+/**
+ * The job topic traverser is an utility class to traverse all jobs
+ * of a specific topic in order of creation.
+ */
public class JobTopicTraverser {
- public interface Handler {
+ /**
+ * Callback called for each found job.
+ */
+ public interface JobCallback {
+
+ /**
+ * Callback handle for a job
+ * @param job The job to handle
+ * @return <code>true</code> If processing should continue, <code>false</code> otherwise.
+ */
boolean handle(final JobImpl job);
}
+ /**
+ * Callback called for each found resource.
+ */
+ public interface ResourceCallback {
+
+ /**
+ * Callback handle for a resource
+ * @param rsrc The resource to handle
+ * @return <code>true</code> If processing should continue, <code>false</code> otherwise.
+ */
+ boolean handle(final Resource rsrc);
+ }
+
+ /**
+ * Traverse the topic and call the callback for each found job.
+ *
+ * Once the callback notifies to stop traversing by returning false, the current minute
+ * will be processed completely (to ensure correct ordering of jobs) and then the
+ * traversal stops.
+ *
+ * @param logger The logger to use for debug logging
+ * @param topicResource The topic resource
+ * @param handler The callback
+ */
+ public static void traverse(final Logger logger,
+ final Resource topicResource,
+ final JobCallback handler) {
+ traverse(logger, topicResource, handler, null);
+ }
+
public static void traverse(final Logger logger,
final Resource topicResource,
- final Handler handler) {
- logger.debug("Processing topic {}", topicResource.getName());
+ final ResourceCallback handler) {
+ traverse(logger, topicResource, null, handler);
+ }
+
+ private static void traverse(final Logger logger,
+ final Resource topicResource,
+ final JobCallback jobHandler,
+ final ResourceCallback resourceHandler) {
+ logger.debug("Processing topic {}", topicResource.getName().replace('.', '/'));
// now years
for(final Resource yearResource: Utility.getSortedChildren(logger, "year", topicResource)) {
final int year = Integer.valueOf(yearResource.getName());
@@ -69,23 +117,31 @@ public class JobTopicTraverser {
while ( jobIter.hasNext() ) {
final Resource jobResource = jobIter.next();
- final JobImpl job = Utility.readJob(logger, jobResource);
- if ( job != null ) {
- logger.debug("Found job {}", jobResource.getName());
- jobs.add(job);
+ if ( resourceHandler != null ) {
+ if ( !resourceHandler.handle(jobResource) ) {
+ return;
+ }
+ } else {
+ final JobImpl job = Utility.readJob(logger, jobResource);
+ if ( job != null ) {
+ logger.debug("Found job {}", jobResource.getName());
+ jobs.add(job);
+ }
}
}
- Collections.sort(jobs);
+ if ( jobHandler != null ) {
+ Collections.sort(jobs);
- boolean stop = false;
- for(final JobImpl job : jobs) {
- if ( !handler.handle(job) ) {
- stop = true;
+ boolean stop = false;
+ for(final JobImpl job : jobs) {
+ if ( !jobHandler.handle(job) ) {
+ stop = true;
+ }
+ }
+ if ( stop ) {
+ return;
}
- }
- if ( stop ) {
- return;
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 06:52:39 2014
@@ -677,9 +677,12 @@ public abstract class AbstractJobQueue
// we keep cancelled jobs and succeeded jobs if the queue is configured like this.
final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
+ } else {
+ this.services.topicManager.reschedule(handler.getJob());
}
this.notifyFinished(rescheduleInfo.reschedule);
+
return rescheduleInfo.reschedule;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java Thu Oct 16 06:52:39 2014
@@ -28,11 +28,9 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.event.impl.jobs.JobImpl;
@@ -70,9 +68,6 @@ public class HistoryCleanUpTask implemen
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
- private ResourceResolverFactory resourceResolverFactory;
-
- @Reference
private JobManagerConfiguration configuration;
@Override
@@ -100,10 +95,8 @@ public class HistoryCleanUpTask implemen
} else {
stateList = null;
}
- ResourceResolver resolver = null;
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
- resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-
if ( stateList == null || stateList.contains(Job.JobState.SUCCEEDED.name()) ) {
this.cleanup(removeDate, resolver, context, configuration.getStoredSuccessfulJobsPath(), topics, null);
}
@@ -117,12 +110,8 @@ public class HistoryCleanUpTask implemen
} catch (final PersistenceException pe) {
// in the case of an error, we just log this as a warning
this.logger.warn("Exception during job resource tree cleanup.", pe);
- } catch (final LoginException ignore) {
- this.ignoreException(ignore);
} finally {
- if ( resolver != null ) {
- resolver.close();
- }
+ resolver.close();
}
return context.result().succeeded();
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java Thu Oct 16 06:52:39 2014
@@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,13 +30,18 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.jobs.QueueConfiguration.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * TODO - note last scan time and not all new observation events to avoid unnecessary rescan
+ * The queue job cache caches jobs per queue based on the topics the queue is actively
+ * processing.
+ *
+ * TODO cache needs to be synchronized!
*/
public class QueueJobCache {
@@ -49,7 +55,9 @@ public class QueueJobCache {
private final Set<String> topics;
- private final Map<String, List<JobImpl>> cache = new HashMap<String, List<JobImpl>>();
+ private final Set<String> topicsWithNewJobs = new HashSet<String>();
+
+ private final List<JobImpl> cache = new ArrayList<JobImpl>();
private final QueueInfo info;
@@ -59,99 +67,145 @@ public class QueueJobCache {
this.configuration = configuration;
this.info = info;
this.topics = topics;
- for(final String topic : topics) {
- this.cache.put(topic, new ArrayList<JobImpl>());
- }
+ this.topicsWithNewJobs.addAll(topics);
}
+ /**
+ * Return the queue info for this queue.
+ * @return The queue info
+ */
public QueueInfo getQueueInfo() {
return this.info;
}
+ /**
+ * All topics of this queue.
+ * @return The topics.
+ */
public Set<String> getTopics() {
return this.topics;
}
/**
* Get the next job - this method is not called concurrently
- * TODO This is very expensive atm
*/
public JobImpl getNextJob() {
JobImpl result = null;
- // check state of cache
- this.loadJobs();
+ if ( this.cache.isEmpty() ) {
+ final Set<String> checkingTopics = new HashSet<String>();
+ synchronized ( this.topicsWithNewJobs ) {
+ checkingTopics.addAll(this.topicsWithNewJobs);
+ this.topicsWithNewJobs.clear();
+ }
+ if ( !checkingTopics.isEmpty() ) {
+ this.loadJobs(checkingTopics);
+ }
+ }
- final List<JobImpl> allJobs = new ArrayList<JobImpl>();
- for(final Map.Entry<String, List<JobImpl>> entry : this.cache.entrySet()) {
- allJobs.addAll(entry.getValue());
- }
- Collections.sort(allJobs);
- if ( allJobs.size() > 0 ) {
- result = allJobs.get(0);
+ if ( !this.cache.isEmpty() ) {
+ result = this.cache.remove(0);
}
+
return result;
}
/**
* Load the next N x numberOf(topics) jobs
*/
- private void loadJobs() {
+ private void loadJobs( final Set<String> checkingTopics) {
logger.debug("Starting jobs loading...");
- ResourceResolver resolver = null;
+ final Map<String, List<JobImpl>> topicCache = new HashMap<String, List<JobImpl>>();
+
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
- for(final String topic : this.topics) {
- final List<JobImpl> list = this.cache.get(topic);
- if ( list.size() < this.maxPreloadLimit ) {
- list.clear();
- if ( resolver == null ) {
- resolver = this.configuration.createResourceResolver();
- }
+ for(final String topic : checkingTopics) {
+ final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
- final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
+ final List<JobImpl> list = new ArrayList<JobImpl>();
+ topicCache.put(topic, list);
- // sanity check - should never be null
- if ( baseResource != null ) {
- final Resource topicResource = baseResource.getChild(topic.replace('/', '.'));
- if ( topicResource != null ) {
- loadJobs(topic, topicResource);
- }
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final Resource topicResource = baseResource.getChild(topic.replace('/', '.'));
+ if ( topicResource != null ) {
+ loadJobs(topic, topicResource, list);
}
}
}
} finally {
- if ( resolver != null ) {
- resolver.close();
+ resolver.close();
+ }
+ orderTopics(topicCache);
+
+ logger.debug("Finished jobs loading {}", this.cache.size());
+ }
+
+ /**
+ * Order the topics based on the queue type and put them in the cache.
+ * @param topicCache The topic based cache
+ */
+ private void orderTopics(final Map<String, List<JobImpl>> topicCache) {
+ if ( this.info.queueConfiguration.getType() == Type.ORDERED
+ || this.info.queueConfiguration.getType() == Type.UNORDERED) {
+ for(final List<JobImpl> list : topicCache.values()) {
+ this.cache.addAll(list);
}
+ Collections.sort(this.cache);
+ } else {
+ // topic round robin
+ boolean done = true;
+ do {
+ for(final Map.Entry<String, List<JobImpl>> entry : topicCache.entrySet()) {
+ if ( !entry.getValue().isEmpty() ) {
+ this.cache.add(entry.getValue().remove(0));
+ if ( !entry.getValue().isEmpty() ) {
+ done = false;
+ }
+ }
+ }
+ } while ( !done ) ;
}
- logger.debug("Finished jobs loading");
}
/**
* Load the next N x numberOf(topics) jobs
*/
- private void loadJobs(final String topic, final Resource topicResource) {
+ private void loadJobs(final String topic, final Resource topicResource, final List<JobImpl> list) {
logger.debug("Loading jobs from topic {}", topic);
- final List<JobImpl> result = this.cache.get(topic);
- JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() {
+ JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
@Override
public boolean handle(final JobImpl job) {
if ( job.getProcessingStarted() == null && !job.hasReadErrors() ) {
- result.add(job);
+ list.add(job);
} else {
logger.debug("Discarding job because {} or {}", job.getProcessingStarted(), job.hasReadErrors());
}
- return result.size() < maxPreloadLimit;
+ return list.size() < maxPreloadLimit;
}
});
- logger.debug("Caching {} jobs for topic {}", result.size(), topic);
+ logger.debug("Caching {} jobs for topic {}", list.size(), topic);
}
+ /**
+ * Mark the topic to contain new jobs.
+ * @param topic The topic
+ */
public void handleNewJob(final String topic) {
- // TODO Auto-generated method stub
+ logger.debug("Update cache to handle new event for topic {}", topic);
+ synchronized ( this.topicsWithNewJobs ) {
+ this.topicsWithNewJobs.add(topic);
+ }
+ }
+ public void reschedule(final JobImpl job) {
+ if ( this.info.queueConfiguration.getType() == Type.ORDERED ) {
+ this.cache.add(0, job);
+ } else {
+ this.cache.add(job);
+ }
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 06:52:39 2014
@@ -41,6 +41,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
@@ -182,10 +183,7 @@ public class TopicManager implements Eve
}
final QueueInfo info = this.queueConfigMgr.getQueueInfo(topic);
if ( changed ) {
- logger.debug("Adding new topic {}", topic);
topicsChanged.set(true);
- logger.info("Starting queue {}", info.queueName);
-
this.queueManager.start(this, info);
} else {
final QueueJobCache cache = this.queueJobCaches.get(info.queueName);
@@ -240,6 +238,7 @@ public class TopicManager implements Eve
private final Map<String, Object> queueLocks = new ConcurrentHashMap<String, Object>();
public JobHandler take(final String queueName) {
+ logger.debug("Taking new job for {}", queueName);
Object lock = new Object();
this.queueLocks.put(queueName, lock);
JobImpl result = null;
@@ -249,7 +248,9 @@ public class TopicManager implements Eve
final Map<String, QueueJobCache> mapping = this.updateConfiguration();
final QueueJobCache cache = mapping.get(queueName);
if ( cache != null ) {
+ logger.debug("Getting new job from cache...");
result = cache.getNextJob();
+ logger.debug("Job from cache={}", result);
if ( result != null ) {
isWaiting = false;
}
@@ -273,6 +274,7 @@ public class TopicManager implements Eve
} finally {
this.queueLocks.remove(queueName);
}
+ logger.debug("Took new job for {} : {}", queueName, result);
return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null);
}
@@ -309,7 +311,7 @@ public class TopicManager implements Eve
for(final String t : topics) {
final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
if ( topicResource != null ) {
- JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() {
+ JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
@Override
public boolean handle(final JobImpl job) {
@@ -346,11 +348,22 @@ public class TopicManager implements Eve
if ( this.isActive.get() ) {
this.initialScan();
for(final Map.Entry<String, QueueJobCache> entry : this.updateConfiguration().entrySet()) {
- logger.info("Starting queue {}", entry.getKey());
-
this.queueManager.start(this, entry.getValue().getQueueInfo());
}
}
}
+ public void reschedule(final JobImpl job) {
+ final QueueInfo info = this.queueConfigMgr.getQueueInfo(job.getTopic());
+ final QueueJobCache cache = this.queueJobCaches.get(info.queueName);
+ if ( cache != null ) {
+ cache.reschedule(job);
+ final Object lock = this.queueLocks.get(info.queueName);
+ if ( lock != null ) {
+ synchronized ( lock ) {
+ lock.notify();
+ }
+ }
+ }
+ }
}
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java&r1=1632213&r2=1632217&rev=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java Thu Oct 16 06:52:39 2014
@@ -30,20 +30,22 @@ import org.apache.sling.api.resource.Val
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.QueueConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Maintenance task...
- *
- * In the default configuration, this task runs every minute
+ * The check topolgoy task checks for changes in the topology and queue configuration
+ * and reassigns jobs.
+ * If the leader instance finds a dead instance it reassigns its jobs to live instances.
+ * The leader instance also checks for unassigned jobs and tries to assign them.
+ * If an instance detects jobs which it doesn't process anymore it reassigns them as
+ * well.
*/
-public class MaintenanceTask {
+public class CheckTopologyTask {
/** Logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -54,13 +56,12 @@ public class MaintenanceTask {
/**
* Constructor
*/
- public MaintenanceTask(final JobManagerConfiguration config) {
+ public CheckTopologyTask(final JobManagerConfiguration config) {
this.configuration = config;
}
- private void reassignJobs(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
- if ( caps != null && caps.isLeader() ) {
+ private void reassignJobsFromStoppedInstances(final TopologyCapabilities caps) {
+ if ( caps != null && caps.isLeader() && caps.isActive() ) {
this.logger.debug("Checking for stopped instances...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
@@ -76,7 +77,7 @@ public class MaintenanceTask {
final String instanceId = instanceResource.getName();
if ( !caps.isActive(instanceId) ) {
logger.debug("Found stopped instance {}", instanceId);
- assignJobs(caps, queueManager, instanceResource, true);
+ assignJobs(caps, instanceResource, true);
}
}
}
@@ -92,8 +93,7 @@ public class MaintenanceTask {
* - topology
* - capabilities
*/
- private void assignUnassignedJobs(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
+ private void assignUnassignedJobs(final TopologyCapabilities caps) {
if ( caps != null && caps.isLeader() ) {
logger.debug("Checking unassigned jobs...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
@@ -103,7 +103,7 @@ public class MaintenanceTask {
// this resource should exist, but we check anyway
if ( unassignedRoot != null ) {
- assignJobs(caps, queueManager, unassignedRoot, false);
+ assignJobs(caps, unassignedRoot, false);
}
} finally {
resolver.close();
@@ -114,9 +114,11 @@ public class MaintenanceTask {
/**
* Try to assign all jobs from the jobs root.
* The jobs are stored by topic
+ * @param caps The topology capabilities
+ * @param jobsRoot The root of the jobs
+ * @param unassign Whether to unassign the job if no instance is found.
*/
private void assignJobs(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager,
final Resource jobsRoot,
final boolean unassign) {
final ResourceResolver resolver = jobsRoot.getResourceResolver();
@@ -138,107 +140,76 @@ public class MaintenanceTask {
// first check if there is an instance for these topics
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
- final QueueInfo info = queueManager.getQueueInfo(topicName);
+ final QueueInfo info = caps.getQueueInfo(topicName);
logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
- // if queue is configured to drop, we drop
- if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) {
- final Iterator<Resource> i = topicResource.listChildren();
- while ( caps.isActive() && i.hasNext() ) {
- final Resource rsrc = i.next();
+ JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
+
+ @Override
+ public boolean handle(final Resource rsrc) {
try {
- resolver.delete(rsrc);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- }
- } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
- // if the queue is not configured to ignore, we can reschedule
- for(final Resource yearResource : topicResource.getChildren() ) {
- for(final Resource monthResource : yearResource.getChildren() ) {
- for(final Resource dayResource : monthResource.getChildren() ) {
- for(final Resource hourResource : dayResource.getChildren() ) {
- for(final Resource minuteResource : hourResource.getChildren() ) {
- for(final Resource rsrc : minuteResource.getChildren() ) {
-
- if ( !caps.isActive() ) {
- return;
- }
-
- try {
- final ValueMap vm = ResourceHelper.getValueMap(rsrc);
- final String targetId = caps.detectTarget(topicName, vm, info);
-
- if ( targetId != null ) {
- final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(rsrc);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- resolver.refresh();
- }
- }
- }
+ final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+ final String targetId = caps.detectTarget(topicName, vm, info);
+
+ if ( targetId != null ) {
+ final String newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ ignoreException(pe);
+ resolver.refresh();
+ resolver.revert();
}
}
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ ignoreException(ie);
+ resolver.refresh();
+ resolver.revert();
}
+ return caps.isActive();
}
- }
+ });
}
+ // now unassign if there are still jobs
if ( caps.isActive() && unassign ) {
// we have to move everything to the unassigned area
- for(final Resource yearResource : topicResource.getChildren() ) {
- for(final Resource monthResource : yearResource.getChildren() ) {
- for(final Resource dayResource : monthResource.getChildren() ) {
- for(final Resource hourResource : dayResource.getChildren() ) {
- for(final Resource minuteResource : hourResource.getChildren() ) {
- for(final Resource rsrc : minuteResource.getChildren() ) {
-
- if ( !caps.isActive() ) {
- return;
- }
-
- try {
- final ValueMap vm = ResourceHelper.getValueMap(rsrc);
- final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(rsrc);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- resolver.refresh();
- }
- }
- }
+ JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
+
+ @Override
+ public boolean handle(final Resource rsrc) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+ final String newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ ignoreException(pe);
+ resolver.refresh();
+ resolver.revert();
}
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ ignoreException(ie);
+ resolver.refresh();
+ resolver.revert();
}
+ return caps.isActive();
}
- }
+ });
}
}
}
@@ -247,16 +218,15 @@ public class MaintenanceTask {
* One maintenance run
*/
public void run(final TopologyCapabilities topologyCapabilities,
- final QueueConfigurationManager queueManager,
final boolean topologyChanged,
final boolean configChanged) {
// if topology changed, reschedule assigned jobs for stopped instances
if ( topologyChanged ) {
- this.reassignJobs(topologyCapabilities, queueManager);
+ this.reassignJobsFromStoppedInstances(topologyCapabilities);
}
// try to assign unassigned jobs
if ( topologyChanged || configChanged ) {
- this.assignUnassignedJobs(topologyCapabilities, queueManager);
+ this.assignUnassignedJobs(topologyCapabilities);
}
}
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java (from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java&r1=1632213&r2=1632217&rev=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java Thu Oct 16 06:52:39 2014
@@ -26,12 +26,16 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.topics.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.jobs.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RestartTask {
+/**
+ * This task is executed when the job handling starts.
+ * It checks for unfinished jobs from a previous start and corrects their state.
+ */
+public class FindUnfinishedJobsTask {
/** Logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -73,7 +77,7 @@ public class RestartTask {
private void initTopic(final Resource topicResource) {
logger.debug("Initializing topic {}...", topicResource.getName());
- JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() {
+ JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
@Override
public boolean handle(final JobImpl job) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java Thu Oct 16 06:52:39 2014
@@ -108,12 +108,12 @@ public class TopologyHandler
final UpgradeTask task = new UpgradeTask();
task.run(this.configuration, this.topologyCapabilities, queueManager);
- final RestartTask rt = new RestartTask();
+ final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
rt.run(this.configuration);
}
- final MaintenanceTask mt = new MaintenanceTask(this.configuration);
- mt.run(topologyCapabilities, queueManager, !isConfigChange, isConfigChange);
+ final CheckTopologyTask mt = new CheckTopologyTask(this.configuration);
+ mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
if ( !isConfigChange ) {
// start listeners
@@ -160,6 +160,10 @@ public class TopologyHandler
}
}
+ /**
+ * Add a topology aware listener
+ * @param service Listener to notify about changes.
+ */
public void addListener(final TopologyAware service) {
synchronized ( this.listeners ) {
this.listeners.add(service);
@@ -167,6 +171,10 @@ public class TopologyHandler
}
}
+ /**
+ * Remove a topology aware listener
+ * @param service Listener to notify about changes.
+ */
public void removeListener(final TopologyAware service) {
synchronized ( this.listeners ) {
this.listeners.remove(service);
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Thu Oct 16 06:52:39 2014
@@ -103,7 +103,7 @@ public class OrderedQueueTest extends Ab
final int counter = job.getProperty("counter", -10);
assertNotEquals("Counter property is missing", -10, counter);
assertTrue("Counter should only increment by max of 1 " + counter + " - " + lastCounter,
- counter == lastCounter || counter == lastCounter +1);
+ counter == lastCounter || counter == lastCounter +1);
lastCounter = counter;
if ("sling/orderedtest/start".equals(job.getTopic()) ) {
cb.block();