You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 15:00:28 UTC
svn commit: r1632292 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/queues/
main/java/org/apache/sling/event/impl/jobs/topics/
test/java/org/apache/sling/event/it/
Author: cziegeler
Date: Thu Oct 16 13:00:28 2014
New Revision: 1632292
URL: http://svn.apache.org/r1632292
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix job removal (WiP)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Thu Oct 16 13:00:28 2014
@@ -107,4 +107,13 @@ public class JobHandler {
public String toString() {
return "JobHandler(" + this.job.getId() + ")";
}
+
+ public void addToRetryList() {
+ this.jobManager.addJobToRetryList(this.job);
+
+ }
+
+ public void removeFromRetryList() {
+ this.jobManager.removeJobFromRetryList(this.job);
+ }
}
\ No newline at end of file
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 13:00:28 2014
@@ -46,7 +46,6 @@ import org.apache.sling.api.resource.Val
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.EventUtil;
-import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
@@ -379,12 +378,20 @@ public class JobManagerImpl
private boolean internalRemoveJobById(final String jobId, final boolean forceRemove) {
logger.debug("Trying to remove job {}", jobId);
boolean result = true;
- final JobImpl job = (JobImpl)this.getJobById(jobId);
+ JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null ) {
- logger.debug("Found removal job: {}", job);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Found removal job: {}", Utility.toString(job));
+ }
+ final JobImpl retryJob = this.getJobFromRetryList(jobId);
+ if ( retryJob != null ) {
+ job = retryJob;
+ }
// currently running?
if ( !forceRemove && job.getProcessingStarted() != null ) {
- logger.debug("Unable to remove job - job is started: {}", job);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Unable to remove job - job is started: {}", Utility.toString(job));
+ }
result = false;
} else {
final boolean isHistoryJob = this.configuration.isStoragePath(job.getResourcePath());
@@ -506,7 +513,7 @@ public class JobManagerImpl
final JobImpl job = Utility.readJob(logger, jobResource);
if ( job != null ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Found job with id {} = {}", id, job);
+ logger.debug("Found job with id {} = {}", id, Utility.toString(job));
}
return job;
}
@@ -987,15 +994,49 @@ public class JobManagerImpl
return new JobImpl(jobTopic, jobName, jobId, properties);
}
+ /**
+ * Reassign a job to a new instance
+ * @param job The job to reassign.
+ */
public void reassign(final JobImpl job) {
final QueueInfo queueInfo = queueManager.getQueueInfo(job.getTopic());
- final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// Sanity check if queue configuration has changed
- String targetId = null;
final TopologyCapabilities caps = this.topologyCapabilities;
- targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
- this.maintenanceTask.reassignJob(job, targetId);
+ final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+ final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ if ( targetId == null ) {
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ } else {
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ }
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(jobResource);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.ignoreException(ie);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
}
/**
@@ -1175,4 +1216,29 @@ public class JobManagerImpl
}
return null;
}
+
+ private final List<JobImpl> retryList = new ArrayList<JobImpl>();
+
+ public void addJobToRetryList(final JobImpl job) {
+ synchronized ( retryList ) {
+ retryList.add(job);
+ }
+ }
+
+ public void removeJobFromRetryList(final JobImpl job) {
+ synchronized ( retryList ) {
+ retryList.remove(job);
+ }
+ }
+
+ private JobImpl getJobFromRetryList(final String jobId) {
+ synchronized ( retryList ) {
+ for(final JobImpl j : retryList) {
+ if ( jobId.equals(j.getId())) {
+ return j;
+ }
+ }
+ }
+ return null;
+ }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Thu Oct 16 13:00:28 2014
@@ -20,10 +20,8 @@ package org.apache.sling.event.impl.jobs
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
@@ -32,8 +30,6 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
import org.apache.sling.event.impl.support.BatchResourceRemover;
-import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,37 +125,6 @@ public class MaintenanceTask {
this.ignoreException(pe);
resolver.refresh();
}
-
-/* Old implementation using a query
- final StringBuilder buf = new StringBuilder(64);
-
- buf.append("//element(*)[@");
- buf.append(ISO9075.encode(ResourceResolver.PROPERTY_RESOURCE_TYPE));
- buf.append(" = '");
- buf.append(Utility.RESOURCE_TYPE_LOCK);
- buf.append("' and @");
- buf.append(ISO9075.encode(Utility.PROPERTY_LOCK_CREATED));
- buf.append(" < xs:dateTime('");
- buf.append(ISO8601.format(startDate));
- buf.append("')]");
- final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
-
- while ( caps.isActive() && result.hasNext() ) {
- final Resource lockResource = result.next();
- // sanity check for the path
- if ( this.configuration.isLock(lockResource.getPath()) ) {
- try {
- resolver.delete(lockResource);
- resolver.commit();
- } catch ( final PersistenceException pe) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- }
- }
- } catch (final QuerySyntaxException qse) {
- this.ignoreException(qse);
-*/
} finally {
resolver.close();
}
@@ -348,46 +313,6 @@ public class MaintenanceTask {
}
/**
- * Reassign a job to a different target
- * @param job The job
- * @param targetId New target or <code>null</code> if unknown
- */
- public void reassignJob(final JobImpl job, final String targetId) {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- try {
- final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
-
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- if ( targetId == null ) {
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- } else {
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- }
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(jobResource);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- }
- }
- } finally {
- resolver.close();
- }
- }
-
- /**
* Helper method which just logs the exception in debug mode.
* @param e
*/
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 13:00:28 2014
@@ -286,7 +286,7 @@ public abstract class AbstractJobQueue
if ( process ) {
if ( handler.reschedule() ) {
this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
- this.reschedule(handler);
+ handler.getJob().retry();
this.services.topicManager.reschedule(handler);
this.notifyFinished(true);
}
@@ -607,7 +607,7 @@ public abstract class AbstractJobQueue
NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
} else {
info.reschedule = true;
- this.reschedule(handler);
+ handler.getJob().retry();
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
}
@@ -680,7 +680,7 @@ public abstract class AbstractJobQueue
final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
} else {
- this.services.topicManager.reschedule(handler);
+ this.reschedule(handler);
}
this.notifyFinished(rescheduleInfo.reschedule);
@@ -820,8 +820,7 @@ public abstract class AbstractJobQueue
}
protected void reschedule(final JobHandler handler) {
- // update event with retry count and retries
- handler.getJob().retry();
+ this.services.topicManager.reschedule(handler);
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Thu Oct 16 13:00:28 2014
@@ -116,6 +116,7 @@ public final class ParallelJobQueue exte
// this job again
final long delay = this.getRetryDelay(handler);
if ( delay > 0 ) {
+ handler.addToRetryList();
final Date fireDate = new Date();
fireDate.setTime(System.currentTimeMillis() + delay);
@@ -123,6 +124,7 @@ public final class ParallelJobQueue exte
final Runnable t = new Runnable() {
@Override
public void run() {
+ handler.removeFromRetryList();
ParallelJobQueue.super.reschedule(handler);
}
};
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 13:00:28 2014
@@ -249,7 +249,7 @@ public class TopicManager implements Eve
this.queueLocks.remove(queueName);
}
if ( logger.isDebugEnabled() ) {
- logger.debug("Took new job for {} : {}", queueName, Utility.toString(result));
+ logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
}
return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null);
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Thu Oct 16 13:00:28 2014
@@ -230,9 +230,9 @@ public class JobHandlingTest extends Abs
jobManager.addJob(TOPIC, "myid2", null);
cb.block();
- assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
+ assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String, Object>[])null).size());
// job is currently waiting, therefore cancel fails
- final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+ final Event e1 = jobManager.findJob(TOPIC, Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
assertNotNull(e1);
assertFalse(jobManager.removeJob((String)e1.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
cb2.block(); // and continue job
@@ -240,7 +240,7 @@ public class JobHandlingTest extends Abs
sleep(200);
// the job is now in the queue again
- final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+ final Event e2 = jobManager.findJob(TOPIC, Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
assertNotNull(e2);
assertTrue(jobManager.removeJob((String)e2.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Thu Oct 16 13:00:28 2014
@@ -45,7 +45,6 @@ import org.ops4j.pax.exam.spi.reactors.P
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
-import org.slf4j.LoggerFactory;
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerMethod.class)
@@ -123,7 +122,6 @@ public class RoundRobinQueueTest extends
@Override
public void handleEvent(final Event event) {
- LoggerFactory.getLogger("test").info("Received finished event {}", event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID));
count.incrementAndGet();
}
});