You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/17 17:46:02 UTC
svn commit: r1632617 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
./ config/ queues/ topics/ topology/
Author: cziegeler
Date: Fri Oct 17 15:46:02 2014
New Revision: 1632617
URL: http://svn.apache.org/r1632617
Log:
SLING-4048 : Avoid keeping jobs in memory. Another refactoring: move job cache to queue (WiP)
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
- copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java
- copied, changed from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Fri Oct 17 15:46:02 2014
@@ -37,7 +37,8 @@ public class JobHandler {
private final JobManagerImpl jobManager;
- public JobHandler(final JobImpl job, final JobManagerImpl jobManager) {
+ public JobHandler(final JobImpl job,
+ final JobManagerImpl jobManager) {
this.job = job;
this.jobManager = jobManager;
}
@@ -48,7 +49,7 @@ public class JobHandler {
public boolean startProcessing(final Queue queue) {
this.isStopped = false;
- return this.jobManager.persistJobProperties(this.job, this.job.prepare(queue));
+ return this.persistJobProperties(this.job.prepare(queue));
}
/**
@@ -62,6 +63,7 @@ public class JobHandler {
/**
* Reschedule the job
+ * Update the retry count and remove the started time.
* @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
*/
public boolean reschedule() {
@@ -72,14 +74,18 @@ public class JobHandler {
this.jobManager.finishJob(this.job, Job.JobState.DROPPED, true, -1);
}
+ /**
+ * Reassign to a new instance.
+ */
public void reassign() {
this.jobManager.reassign(this.job);
}
- public void persistJobProperties(final String... propNames) {
- if ( propNames != null ) {
- this.jobManager.persistJobProperties(this.job, propNames);
- }
+ /**
+ * Update the property of a job in the resource tree
+ */
+ public boolean persistJobProperties(final String... propNames) {
+ return this.jobManager.persistJobProperties(this.job, propNames);
}
public boolean isStopped() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Fri Oct 17 15:46:02 2014
@@ -127,7 +127,8 @@ public class JobManagerImpl
@Reference
private StatisticsManager statisticsManager;
- @Reference QueueManager qManager;
+ @Reference
+ private QueueManager qManager;
private volatile TopologyCapabilities topologyCapabilities;
@@ -795,38 +796,6 @@ public class JobManagerImpl
}
/**
- * Reschedule a job.
- *
- * Update the retry count and remove the started time.
- * @param job The job
- * @return true if the job could be updated.
- */
- public boolean reschedule(final JobImpl job) {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
- if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
- mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
- }
- mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
- try {
- resolver.commit();
- return true;
- } catch ( final PersistenceException pe ) {
- ignoreException(pe);
- }
- }
- } finally {
- resolver.close();
- }
-
- return false;
- }
-
- /**
* Try to get a "lock" for a resource
*/
private boolean lock(final String jobTopic, final String id) {
@@ -995,51 +964,6 @@ public class JobManagerImpl
}
/**
- * Reassign a job to a new instance
- * @param job The job to reassign.
- */
- public void reassign(final JobImpl job) {
- final QueueInfo queueInfo = queueManager.getQueueInfo(job.getTopic());
-
- // Sanity check if queue configuration has changed
- final TopologyCapabilities caps = this.topologyCapabilities;
- final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
-
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- try {
- final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
-
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- if ( targetId == null ) {
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- } else {
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- }
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(jobResource);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- }
- }
- } finally {
- resolver.close();
- }
- }
-
- /**
* Get the current capabilities
*/
public TopologyCapabilities getTopologyCapabilities() {
@@ -1047,41 +971,6 @@ public class JobManagerImpl
}
/**
- * Update the property of a job in the resource tree
- */
- public boolean persistJobProperties(final JobImpl job, final String... propNames) {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- for(final String propName : propNames) {
- final Object val = job.getProperty(propName);
- if ( val != null ) {
- if ( val.getClass().isEnum() ) {
- mvm.put(propName, val.toString());
- } else {
- mvm.put(propName, val);
- }
- } else {
- mvm.remove(propName);
- }
- }
- resolver.commit();
-
- return true;
- } else {
- logger.debug("No job resource found at {}", job.getResourcePath());
- }
- } catch ( final PersistenceException ignore ) {
- this.ignoreException(ignore);
- } finally {
- resolver.close();
- }
- return false;
- }
-
- /**
* @see org.apache.sling.event.jobs.JobManager#stopJobById(java.lang.String)
*/
@Override
@@ -1241,4 +1130,119 @@ public class JobManagerImpl
}
return null;
}
+
+ /**
+ * Reassign a job to a new instance
+ */
+ public void reassign(final JobImpl job) {
+ final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
+ // Sanity check if queue configuration has changed
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+ final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ if ( targetId == null ) {
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ } else {
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ }
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(jobResource);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.ignoreException(ie);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+
+ /**
+ * Reschedule the job
+ * Update the retry count and remove the started time.
+ * @param job The job
+ * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
+ */
+ public boolean reschedule(final JobImpl job) {
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+ if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+ mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+ }
+ mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ try {
+ resolver.commit();
+ return true;
+ } catch ( final PersistenceException pe ) {
+ this.logger.debug("Unable to update reschedule properties for job " + job.getId(), pe);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+
+ return false;
+ }
+
+ /**
+ * Update the property of a job in the resource tree
+ * @param job The job
+ * @param propNames the property names to update
+ * @return {@code true} if the update was successful.
+ */
+ public boolean persistJobProperties(final JobImpl job, final String... propNames) {
+ if ( propNames != null ) {
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ for(final String propName : propNames) {
+ final Object val = job.getProperty(propName);
+ if ( val != null ) {
+ if ( val.getClass().isEnum() ) {
+ mvm.put(propName, val.toString());
+ } else {
+ mvm.put(propName, val);
+ }
+ } else {
+ mvm.remove(propName);
+ }
+ }
+ resolver.commit();
+
+ return true;
+ } else {
+ logger.debug("No job resource found at {}", job.getResourcePath());
+ }
+ } catch ( final PersistenceException ignore ) {
+ logger.debug("Unable to persist properties", ignore);
+ } finally {
+ resolver.close();
+ }
+ return false;
+ }
+ return true;
+ }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Fri Oct 17 15:46:02 2014
@@ -141,6 +141,11 @@ public class QueueConfigurationManager {
public InternalQueueConfiguration queueConfiguration;
public String queueName;
public String targetId;
+
+ @Override
+ public String toString() {
+ return queueName;
+ }
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri Oct 17 15:46:02 2014
@@ -35,6 +35,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerImpl;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
@@ -150,6 +151,10 @@ public abstract class AbstractJobQueue
return this.services.statisticsManager.getQueueStatistics(this.queueName);
}
+ public QueueJobCache getCache() {
+ return this.services.cache;
+ }
+
/**
* Start the job queue.
*/
@@ -158,7 +163,7 @@ public abstract class AbstractJobQueue
@Override
public void run() {
- while ( running ) {
+ while ( running && !isOutdated()) {
logger.info("Starting job queue {}", queueName);
logger.debug("Configuration for job queue={}", configuration);
@@ -186,8 +191,7 @@ public abstract class AbstractJobQueue
* Outdate this queue.
*/
public void outdate() {
- if ( !this.isOutdated() ) {
- this.isOutdated.set(true);
+ if ( this.isOutdated.compareAndSet(false, true) ) {
final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
this.queueName = name;
@@ -229,8 +233,8 @@ public abstract class AbstractJobQueue
this.logger.debug("Waking up waiting queue {}", this.queueName);
this.notifyFinished(false);
}
- // continue queue processing to stop the queue
- this.services.topicManager.stop(this.getName());
+ // stop the queue
+ this.stopWaitingForNextJob();
synchronized ( this.processingJobsLists ) {
this.processingJobsLists.clear();
@@ -286,7 +290,7 @@ public abstract class AbstractJobQueue
if ( handler.reschedule() ) {
this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
handler.getJob().retry();
- this.services.topicManager.reschedule(handler);
+ this.requeue(handler);
this.notifyFinished(true);
}
}
@@ -298,7 +302,7 @@ public abstract class AbstractJobQueue
* Execute the queue
*/
private void runJobQueue() {
- while ( this.running ) {
+ while ( this.running && !this.isOutdated()) {
JobHandler info = null;
if ( info == null ) {
// so let's wait/get the next job from the queue
@@ -306,15 +310,81 @@ public abstract class AbstractJobQueue
}
// if we're suspended we drop the current item
- if ( this.running && info != null && !checkSuspended(info) ) {
+ if ( this.running && info != null && !this.isOutdated() && !checkSuspended(info) ) {
// if we still have a job and are running, let's go
this.start(info);
}
}
}
+ private volatile boolean isWaitingForNextJob = false;
+ private final Object nextJobLock = new Object();
+
+ /**
+ * Take a new job for this queue.
+ * This method blocks until a job is available or the queue is stopped.
+ * @param queueName The queue name
+ * @return A new job or {@code null} if {@link #stop(String)} is called.
+ */
private JobHandler take() {
- return this.services.topicManager.take(this.getName());
+ logger.debug("Taking new job for {}", queueName);
+ JobImpl result = null;
+
+ this.isWaitingForNextJob = true;
+ while ( this.isWaitingForNextJob && !this.isOutdated()) {
+ result = this.services.cache.getNextJob();
+ if ( result != null ) {
+ isWaitingForNextJob = false;
+ } else {
+ // block
+ synchronized ( nextJobLock ) {
+ if ( isWaitingForNextJob ) {
+ try {
+ nextJobLock.wait();
+ } catch ( final InterruptedException ignore ) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+ this.isWaitingForNextJob = false;
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
+ }
+ return (result != null ? new JobHandler( result, (JobManagerImpl)this.services.jobManager) : null);
+ }
+
+ /**
+ * Stop waiting for a job
+ * @param queueName The queue name.
+ */
+ private void stopWaitingForNextJob() {
+ synchronized ( nextJobLock ) {
+ this.isWaitingForNextJob = false;
+ nextJobLock.notify();
+ }
+ }
+
+ /**
+ * Inform the queue about a job for the topic
+ * @param topic A new topic.
+ */
+ public void wakeUpQueue(final String topic) {
+ this.services.cache.handleNewJob(topic);
+ this.stopWaitingForNextJob();
+ }
+
+ /**
+ * Put a job back in the queue
+ * @param handler The job handler
+ */
+ private void requeue(final JobHandler handler) {
+ this.services.cache.reschedule(handler);
+ synchronized ( this.nextJobLock ) {
+ this.nextJobLock.notify();
+ }
}
/**
@@ -324,7 +394,7 @@ public abstract class AbstractJobQueue
boolean wasSuspended = false;
synchronized ( this.suspendLock ) {
while ( this.suspendedSince != -1 ) {
- this.services.topicManager.reschedule(handler);
+ this.requeue(handler);
logger.debug("Sleeping as queue {} is suspended.", this.getName());
wasSuspended = true;
final long diff = System.currentTimeMillis() - this.suspendedSince;
@@ -820,7 +890,7 @@ public abstract class AbstractJobQueue
}
protected void reschedule(final JobHandler handler) {
- this.services.topicManager.reschedule(handler);
+ this.requeue(handler);
}
/**
@@ -851,5 +921,6 @@ public abstract class AbstractJobQueue
protected abstract void start(final JobHandler handler);
protected abstract void notifyFinished(boolean reschedule);
+
}
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java&r1=1632478&r2=1632617&rev=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Fri Oct 17 15:46:02 2014
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs.topics;
+package org.apache.sling.event.impl.jobs.queues;
import java.util.ArrayList;
import java.util.Collections;
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
@@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
* The queue job cache caches jobs per queue based on the topics the queue is actively
* processing.
*
- * TODO cache needs to be synchronized!
*/
public class QueueJobCache {
@@ -77,7 +77,7 @@ public class QueueJobCache {
final Set<String> topics) {
this.configuration = configuration;
this.info = info;
- this.topics = topics;
+ this.topics = new ConcurrentSkipListSet<String>(topics);
this.topicsWithNewJobs.addAll(topics);
}
@@ -216,6 +216,7 @@ public class QueueJobCache {
synchronized ( this.topicsWithNewJobs ) {
this.topicsWithNewJobs.add(topic);
}
+ this.topics.add(topic);
}
public void reschedule(final JobHandler handler) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Fri Oct 17 15:46:02 2014
@@ -19,9 +19,11 @@
package org.apache.sling.event.impl.jobs.queues;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.felix.scr.annotations.Activate;
@@ -41,9 +43,9 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.impl.jobs.topics.TopicManager;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
@@ -90,7 +92,7 @@ public class QueueManager
private StatisticsManager statisticsManager;
@Reference
- private QueueConfigurationManager queueManager;
+ private QueueConfigurationManager queueConfigurationManager;
/** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
private final Object queuesLock = new Object();
@@ -175,9 +177,15 @@ public class QueueManager
* This method first searches the corresponding queue - if such a queue
* does not exist yet, it is created and started.
*
+ * @param topicManager The topic manager
+ * @param jobManager The job manager
+ * @param queueInfo The queue info
* @param topic The topic
*/
- public void start(final TopicManager topicManager, final QueueInfo queueInfo) {
+ public void start(final TopicManager topicManager,
+ final JobManager jobManager,
+ final QueueInfo queueInfo,
+ final String topic) {
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// get or create queue
AbstractJobQueue queue = null;
@@ -198,6 +206,10 @@ public class QueueManager
services.threadPoolManager = this.threadPoolManager;
services.topicManager = topicManager;
services.statisticsManager = statisticsManager;
+ services.jobManager = jobManager;
+ final Set<String> topics = new HashSet<String>();
+ topics.add(topic);
+ services.cache = new QueueJobCache(configuration, queueInfo, topics);
if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
queue = new OrderedJobQueue(queueInfo.queueName, config, services);
} else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
@@ -211,6 +223,8 @@ public class QueueManager
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
queue.start();
}
+ } else {
+ queue.wakeUpQueue(topic);
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Fri Oct 17 15:46:02 2014
@@ -22,7 +22,7 @@ import org.apache.sling.commons.schedule
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
-import org.apache.sling.event.impl.jobs.topics.TopicManager;
+import org.apache.sling.event.jobs.JobManager;
import org.osgi.service.event.EventAdmin;
public class QueueServices {
@@ -38,4 +38,8 @@ public class QueueServices {
public TopicManager topicManager;
public StatisticsManager statisticsManager;
+
+ public QueueJobCache cache;
+
+ public JobManager jobManager;
}
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java (from r1632478, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java&r1=1632478&r2=1632617&rev=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicManager.java Fri Oct 17 15:46:02 2014
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs.topics;
+package org.apache.sling.event.impl.jobs.queues;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
@@ -36,15 +34,12 @@ import org.apache.felix.scr.annotations.
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.JobManagerImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
-import org.apache.sling.event.impl.jobs.queues.QueueManager;
import org.apache.sling.event.impl.jobs.topology.TopologyAware;
import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
@@ -61,8 +56,6 @@ import org.slf4j.LoggerFactory;
/**
* Topic manager
*
- * TODO - Check syncing of take/update/stop. This might not be 100% correct yet.
- * TODO - Block take() if inactive
*/
@Component(immediate=true)
@Service(value=EventHandler.class)
@@ -70,7 +63,7 @@ import org.slf4j.LoggerFactory;
public class TopicManager implements EventHandler, TopologyAware {
/** Logger. */
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
@Reference
private JobManagerConfiguration configuration;
@@ -87,18 +80,10 @@ public class TopicManager implements Eve
@Reference
private JobManager jobManager;
- /** Queue configuration counter to track for changes. */
- private volatile int queueConfigCount = -1;
-
- /** A set of all topics. Access needs synchronization. */
- private final Set<String> topics = new TreeSet<String>();
-
- /** Marker if a new topic has been added. */
- private final AtomicBoolean topicsChanged = new AtomicBoolean(false);
-
- /** The mapping from a queue name to a cache. */
- private Map<String, QueueJobCache> queueJobCaches = Collections.emptyMap();
+ /** The mapping from a topic to a queue info. */
+ private final Map<String, QueueInfo> topicMapping = new HashMap<String, QueueInfo>();
+ /** Flag whether the manager is active or suspended. */
private final AtomicBoolean isActive = new AtomicBoolean(false);
/**
@@ -118,10 +103,36 @@ public class TopicManager implements Eve
}
/**
+ * This method is called whenever the topology or queue configurations change.
+ * @param caps The new topology capabilities or {@code null} if currently unknown.
+ */
+ @Override
+ public void topologyChanged(final TopologyCapabilities caps) {
+ logger.debug("Topology changed {}", caps);
+ synchronized ( this.topicMapping ) {
+ if ( caps != null ) {
+ final Set<String> topics = this.initialScan();
+ this.updateTopicMapping(topics);
+ // start queues
+ for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet() ) {
+ this.queueManager.start(this, this.jobManager, entry.getValue(), entry.getKey());
+ }
+ this.isActive.set(true);
+ } else {
+ this.isActive.set(false);
+ this.queueManager.restart();
+ this.topicMapping.clear();
+ }
+ }
+ }
+
+ /**
* Scan the resource tree for topics.
*/
- private void initialScan() {
+ private Set<String> initialScan() {
logger.debug("Scanning repository for existing topics...");
+ final Set<String> topics = new HashSet<String>();
+
final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
@@ -133,15 +144,13 @@ public class TopicManager implements Eve
final Resource topicResource = topicIter.next();
final String topic = topicResource.getName().replace('.', '/');
logger.debug("Found topic {}", topic);
- synchronized ( topics ) {
- topics.add(topic);
- }
- topicsChanged.set(true);
+ topics.add(topic);
}
}
} finally {
resolver.close();
}
+ return topics;
}
/**
@@ -150,117 +159,30 @@ public class TopicManager implements Eve
@Override
public void handleEvent(final Event event) {
final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
- if ( topic != null ) {
- boolean changed = false;
- synchronized ( topics ) {
- final int len = topics.size();
- topics.add(topic);
- changed = topics.size() > len;
- }
- final QueueInfo info = this.queueConfigMgr.getQueueInfo(topic);
- if ( changed ) {
- topicsChanged.set(true);
- this.queueManager.start(this, info);
- } else {
- final QueueJobCache cache = this.queueJobCaches.get(info.queueName);
- if ( cache != null ) {
- cache.handleNewJob(topic);
- final Object lock = this.queueLocks.get(info.queueName);
- if ( lock != null ) {
- synchronized ( lock ) {
- lock.notify();
- }
- }
+ synchronized ( this.topicMapping ) {
+ if ( this.isActive.get() && topic != null ) {
+ QueueInfo info = this.topicMapping.get(topic);
+ if ( info == null ) {
+ info = this.queueConfigMgr.getQueueInfo(topic);
+ this.topicMapping.put(topic, info);
}
+ this.queueManager.start(this, this.jobManager, info, topic);
}
}
}
/**
* Get the latest mapping from queue name to topics
- * @return A map from queue names to topics
*/
- private Map<String, QueueJobCache> updateConfiguration() {
- if ( this.queueConfigCount < this.queueConfigMgr.getChangeCount() || this.topicsChanged.get() ) {
+ private void updateTopicMapping(final Set<String> topics) {
+ this.topicMapping.clear();
- final Map<String, Set<String>> mapping = new HashMap<String, Set<String>>();
-
- synchronized ( this.topics ) {
- this.queueConfigCount = this.queueConfigMgr.getChangeCount();
- this.topicsChanged.set(false);
-
- for(final String topic : this.topics) {
- final QueueInfo queueInfo = this.queueConfigMgr.getQueueInfo(topic);
- Set<String> names = mapping.get(queueInfo.queueName);
- if ( names == null ) {
- names = new TreeSet<String>();
- mapping.put(queueInfo.queueName, names);
- }
- names.add(topic);
- }
- }
-
- final Map<String, QueueJobCache> cacheMap = new HashMap<String, QueueJobCache>();
- for(final Map.Entry<String, Set<String>> entry : mapping.entrySet()) {
- cacheMap.put(entry.getKey(),
- new QueueJobCache(this.configuration, this.queueConfigMgr.getQueueInfo(entry.getValue().iterator().next()), entry.getValue()));
- }
- this.queueJobCaches = cacheMap;
- this.logger.debug("Established new topic mapping: {}", mapping);
+ for(final String topic : topics) {
+ final QueueInfo queueInfo = this.queueConfigMgr.getQueueInfo(topic);
+ this.topicMapping.put(topic, queueInfo);
}
- return this.queueJobCaches;
- }
-
- private final Map<String, Object> queueLocks = new ConcurrentHashMap<String, Object>();
- public JobHandler take(final String queueName) {
- logger.debug("Taking new job for {}", queueName);
- Object lock = new Object();
- this.queueLocks.put(queueName, lock);
- JobImpl result = null;
- try {
- boolean isWaiting = true;
- while ( isWaiting ) {
- final Map<String, QueueJobCache> mapping = this.updateConfiguration();
- final QueueJobCache cache = mapping.get(queueName);
- if ( cache != null ) {
- result = cache.getNextJob();
- if ( result != null ) {
- isWaiting = false;
- }
- }
- if ( isWaiting ) {
- // block
- synchronized ( lock ) {
- try {
- lock.wait();
- // refetch lock
- lock = this.queueLocks.get(queueName);
- if ( lock == null ) {
- isWaiting = false;
- }
- } catch ( final InterruptedException ignore ) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- } finally {
- this.queueLocks.remove(queueName);
- }
- if ( logger.isDebugEnabled() ) {
- logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
- }
- return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null);
- }
-
- public void stop(final String queueName) {
- final Object lock = queueLocks.remove(queueName);
- if ( lock != null ) {
- synchronized ( lock ) {
- lock.notify();
- }
- }
+ this.logger.debug("Established new topic mapping: {}", this.topicMapping);
}
/**
@@ -268,83 +190,57 @@ public class TopicManager implements Eve
* @param queueName The queue name
*/
public void removeAll(final String queueName) {
- final Map<String, QueueJobCache> mapping = this.updateConfiguration();
- final QueueJobCache cache = mapping.get(queueName);
- if ( cache != null ) {
- final Set<String> topics = cache.getTopics();
- logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
-
- if ( topics != null ) {
-
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
-
- // sanity check - should never be null
- if ( baseResource != null ) {
- final BatchResourceRemover brr = new BatchResourceRemover();
-
- for(final String t : topics) {
- final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
- if ( topicResource != null ) {
- JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
-
- @Override
- public boolean handle(final JobImpl job) {
- final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
- // sanity check
- if ( jobResource != null ) {
- try {
- brr.delete(jobResource);
- } catch ( final PersistenceException ignore) {
- logger.error("Unable to remove job " + job, ignore);
- }
+ final Set<String> topics = new HashSet<String>();
+ synchronized ( this.topicMapping ) {
+ for(final Map.Entry<String, QueueInfo> entry : this.topicMapping.entrySet()) {
+ if ( entry.getValue().queueName.equals(queueName) ) {
+ topics.add(entry.getKey());
+ }
+ }
+ }
+ logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
+
+ if ( !topics.isEmpty() ) {
+
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
+
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final BatchResourceRemover brr = new BatchResourceRemover();
+
+ for(final String t : topics) {
+ final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
+ if ( topicResource != null ) {
+ JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
+
+ @Override
+ public boolean handle(final JobImpl job) {
+ final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+ // sanity check
+ if ( jobResource != null ) {
+ try {
+ brr.delete(jobResource);
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to remove job " + job, ignore);
+ topicResource.getResourceResolver().revert();
+ topicResource.getResourceResolver().refresh();
}
- return true;
}
- });
- }
- }
- try {
- resolver.commit();
- } catch ( final PersistenceException ignore) {
- logger.error("Unable to remove jobs", ignore);
+ return true;
+ }
+ });
}
}
- } finally {
- resolver.close();
- }
- }
- }
- }
-
- @Override
- public void topologyChanged(final TopologyCapabilities caps) {
- this.isActive.set(caps != null);
- if ( this.isActive.get() ) {
- this.initialScan();
- for(final Map.Entry<String, QueueJobCache> entry : this.updateConfiguration().entrySet()) {
- this.queueManager.start(this, entry.getValue().getQueueInfo());
- }
- } else {
- this.queueManager.restart();
- }
- }
-
- /**
- * Reschedule a job
- * @param handler The job handler
- */
- public void reschedule(final JobHandler handler) {
- final QueueInfo info = this.queueConfigMgr.getQueueInfo(handler.getJob().getTopic());
- final QueueJobCache cache = this.queueJobCaches.get(info.queueName);
- if ( cache != null ) {
- cache.reschedule(handler);
- final Object lock = this.queueLocks.get(info.queueName);
- if ( lock != null ) {
- synchronized ( lock ) {
- lock.notify();
+ try {
+ resolver.commit();
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to remove jobs", ignore);
+ }
}
+ } finally {
+ resolver.close();
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyAware.java Fri Oct 17 15:46:02 2014
@@ -18,8 +18,14 @@
*/
package org.apache.sling.event.impl.jobs.topology;
-
+/**
+ * Listener interface to get topology / queue changes.
+ */
public interface TopologyAware {
+ /**
+ * Notify about a change.
+ * @param caps The new topology capabilities or {@code null}
+ */
void topologyChanged(final TopologyCapabilities caps);
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java?rev=1632617&r1=1632616&r2=1632617&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java Fri Oct 17 15:46:02 2014
@@ -112,10 +112,8 @@ public class TopologyHandler
final CheckTopologyTask mt = new CheckTopologyTask(this.configuration, this.queueConfigManager);
mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
- if ( !isConfigChange ) {
- // start listeners
- this.notifiyListeners();
- }
+ // start listeners
+ this.notifiyListeners();
}
private void notifiyListeners() {