You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/09/30 14:12:59 UTC
svn commit: r1527530 - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/impl/jobs/
src/main/java/org/apache/sling/event/impl/jobs/config/
src/main/java/org/apache/sling/event/impl/jobs/queues/
src/main/java/org/apache/s...
Author: cziegeler
Date: Mon Sep 30 12:12:58 2013
New Revision: 1527530
URL: http://svn.apache.org/r1527530
Log:
SLING-3028 : Support for progress tracking of jobs
Added:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java (with props)
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Mon Sep 30 12:12:58 2013
@@ -47,7 +47,7 @@
<sling.java.version>6</sling.java.version>
<exam.version>3.0.3</exam.version>
<url.version>1.5.2</url.version>
- <bundle.build.name>${basedir}/target</bundle.build.name>
+ <bundle.build.name>${basedir}/target</bundle.build.name>
<bundle.file.name>${bundle.build.name}/${project.build.finalName}.jar</bundle.file.name>
</properties>
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Mon Sep 30 12:12:58 2013
@@ -51,9 +51,9 @@ public class JobHandler {
* Finish the processing of the job
* @param state The state of processing
*/
- public void finished(final JobState state) {
+ public void finished(final JobState state, final boolean keepJobInHistory) {
// for now we just keep cancelled jobs
- this.jobManager.finished(this, state, state != JobState.SUCCEEDED);
+ this.jobManager.finishJob(this.job, state, keepJobInHistory);
}
/**
@@ -61,15 +61,15 @@ public class JobHandler {
* @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
*/
public boolean reschedule() {
- return this.jobManager.reschedule(this);
+ return this.jobManager.reschedule(this.job);
}
- public boolean remove() {
- return this.jobManager.remove(this.job);
+ public void cancel() {
+ this.jobManager.finishJob(this.job, JobState.CANCELLED, true);
}
public void reassign() {
- this.jobManager.reassign(this);
+ this.jobManager.reassign(this.job);
}
@Override
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Mon Sep 30 12:12:58 2013
@@ -62,7 +62,10 @@ public class JobImpl implements Job {
public static final String PROPERTY_MESSAGE = "slingevent:message";
/** Property for finished jobs. */
- public static final String PROPERTY_FINISHED = "slingevent:finished";
+ public static final String PROPERTY_FINISHED_STATE = "slingevent:finishedState";
+
+ /** Property for finished jobs. */
+ public static final String PROPERTY_FINISHED_DATE = "slingevent:finishedDate";
private final ValueMap properties;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Sep 30 12:12:58 2013
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.impl.jobs;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
@@ -296,7 +297,7 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
}
- this.remove(job);
+ this.finishJob(job, JobState.CANCELLED, false);
} else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
if ( !reassign ) {
if ( logger.isDebugEnabled() ) {
@@ -332,7 +333,7 @@ public class JobManagerImpl
if ( queue == null ) {
// this is just a sanity check, actually we can never get here
logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
- this.remove(job);
+ this.finishJob(job, JobState.CANCELLED, false);
} else {
queues.put(queueInfo.queueName, queue);
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
@@ -729,39 +730,51 @@ public class JobManagerImpl
*/
@Override
public boolean removeJob(final String jobId) {
+ return this.internalRemoveJobJobById(jobId, false);
+ }
+
+ private boolean internalRemoveJobJobById(final String jobId, final boolean forceRemove) {
logger.debug("Trying to remove job {}", jobId);
boolean result = true;
- final Job job = this.getJobById(jobId);
- logger.debug("Found removal job: {}", job);
+ final JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null ) {
+ logger.debug("Found removal job: {}", job);
// currently running?
- if ( job.getProcessingStarted() != null ) {
+ if ( !forceRemove && job.getProcessingStarted() != null ) {
logger.debug("Unable to remove job - job is started: {}", job);
result = false;
} else {
- ResourceResolver resolver = null;
- try {
- resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource jobResource = resolver.getResource(((JobImpl)job).getResourcePath());
- if ( jobResource != null ) {
- resolver.delete(jobResource);
- resolver.commit();
- logger.debug("Removed job with id: {}", jobId);
- } else {
- logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
- }
- } catch ( final LoginException le ) {
- this.ignoreException(le);
- result = false;
- } catch ( final PersistenceException pe) {
- this.ignoreException(pe);
- result = false;
- } finally {
- if ( resolver != null ) {
- resolver.close();
+ final boolean isHistoryJob = this.configuration.isStoragePath(job.getResourcePath());
+ // if history job, simply remove - otherwise move to history!
+ if ( isHistoryJob ) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ resolver.delete(jobResource);
+ resolver.commit();
+ logger.debug("Removed job with id: {}", jobId);
+ } else {
+ logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
+ }
+ } catch ( final LoginException le ) {
+ this.ignoreException(le);
+ result = false;
+ } catch ( final PersistenceException pe) {
+ this.ignoreException(pe);
+ result = false;
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
}
+ } else {
+ this.finishJob(job, JobState.CANCELLED, true);
}
}
+ } else {
+ logger.debug("Job for removal does not exist (anymore): {}", jobId);
}
return result;
}
@@ -771,7 +784,7 @@ public class JobManagerImpl
*/
@Override
public void forceRemoveJob(final String jobId) {
- this.removeJobById(jobId);
+ this.internalRemoveJobJobById(jobId, true);
}
/**
@@ -792,6 +805,15 @@ public class JobManagerImpl
logger.warn("{}", errorMessage);
return null;
}
+ if ( properties != null ) {
+ for(final Object val : properties.values()) {
+ if ( val != null && !(val instanceof Serializable) ) {
+ logger.warn("Discarding job - properties must be serializable: {} {} : {}",
+ new Object[] {topic, name, properties});
+ return null;
+ }
+ }
+ }
Job result = this.addJobInteral(topic, name, properties);
if ( result == null && name != null ) {
result = this.getJobByName(name);
@@ -908,31 +930,8 @@ public class JobManagerImpl
* @see org.apache.sling.event.jobs.JobManager#removeJobById(java.lang.String)
*/
@Override
- public boolean removeJobById(String jobId) {
- boolean result = true;
- final Job job = this.getJobById(jobId);
- if ( job != null ) {
- ResourceResolver resolver = null;
- try {
- resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource jobResource = resolver.getResource(((JobImpl)job).getResourcePath());
- if ( jobResource != null ) {
- resolver.delete(jobResource);
- resolver.commit();
- }
- } catch ( final LoginException le ) {
- this.ignoreException(le);
- result = false;
- } catch ( final PersistenceException pe) {
- this.ignoreException(pe);
- result = false;
- } finally {
- if ( resolver != null ) {
- resolver.close();
- }
- }
- }
- return result;
+ public boolean removeJobById(final String jobId) {
+ return this.internalRemoveJobJobById(jobId, true);
}
/**
@@ -943,6 +942,7 @@ public class JobManagerImpl
final String topic,
final long limit,
final Map<String, Object>... templates) {
+ final boolean isHistoryQuery = type == QueryType.HISTORY || type == QueryType.SUCCEEDED || type == QueryType.CANCELLED;
final List<Job> result = new ArrayList<Job>();
ResourceResolver resolver = null;
try {
@@ -957,17 +957,34 @@ public class JobManagerImpl
buf.append(" = '");
buf.append(topic);
buf.append("'");
- if ( type == QueryType.ACTIVE ) {
+
+ // restricting on the type - history or unfinished
+ if ( isHistoryQuery ) {
buf.append(" and @");
- buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
- } else if ( type == QueryType.QUEUED ) {
+ buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
+ if ( type == QueryType.SUCCEEDED ) {
+ buf.append(" = '");
+ buf.append(JobState.SUCCEEDED.name());
+ buf.append("'");
+ } else if ( type == QueryType.CANCELLED ) {
+ buf.append(" = '");
+ buf.append(JobState.CANCELLED.name());
+ buf.append("'");
+ }
+ } else {
buf.append(" and not(@");
- buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+ buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
buf.append(")");
+ if ( type == QueryType.ACTIVE ) {
+ buf.append(" and @");
+ buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+ } else if ( type == QueryType.QUEUED ) {
+ buf.append(" and not(@");
+ buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+ buf.append(")");
+ }
}
- buf.append(" and not(@");
- buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED));
- buf.append(")");
+
if ( templates != null && templates.length > 0 ) {
buf.append(" and (");
int index = 0;
@@ -998,8 +1015,13 @@ public class JobManagerImpl
buf.append(')');
}
buf.append("] order by @");
- buf.append(Job.PROPERTY_JOB_CREATED);
- buf.append(" ascending");
+ if ( isHistoryQuery ) {
+ buf.append(JobImpl.PROPERTY_FINISHED_DATE);
+ buf.append(" descending");
+ } else {
+ buf.append(Job.PROPERTY_JOB_CREATED);
+ buf.append(" ascending");
+ }
final Iterator<Resource> iter = resolver.findResources(buf.toString(), "xpath");
long count = 0;
@@ -1031,20 +1053,21 @@ public class JobManagerImpl
* @param info The job handler
* @param state The state of the processing
*/
- public void finished(final JobHandler handler, final JobState state, final boolean keepJobInHistory) {
+ public void finishJob(final JobImpl job, final JobState state, final boolean keepJobInHistory) {
final boolean isSuccess = (state == JobState.SUCCEEDED);
ResourceResolver resolver = null;
try {
resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource jobResource = resolver.getResource(handler.getJob().getResourcePath());
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
if ( jobResource != null ) {
try {
String newPath = null;
if ( keepJobInHistory ) {
final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- newPath = this.configuration.getStoragePath(handler.getJob(), isSuccess);
+ newPath = this.configuration.getStoragePath(job, isSuccess);
final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.put(JobImpl.PROPERTY_FINISHED, isSuccess);
+ props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ? JobState.SUCCEEDED.name() : JobState.CANCELLED.name());
+ props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
ResourceHelper.getOrCreateResource(resolver, newPath, props);
}
@@ -1053,9 +1076,9 @@ public class JobManagerImpl
if ( keepJobInHistory && logger.isDebugEnabled() ) {
if ( isSuccess ) {
- logger.debug("Kept successful job {} at {}", Utility.toString(handler.getJob()), newPath);
+ logger.debug("Kept successful job {} at {}", Utility.toString(job), newPath);
} else {
- logger.debug("Moved cancelled job {} to {}", Utility.toString(handler.getJob()), newPath);
+ logger.debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
}
}
} catch ( final PersistenceException pe ) {
@@ -1078,17 +1101,17 @@ public class JobManagerImpl
* Reschedule a job.
*
* Update the retry count and remove the started time.
- * @param info The job info
+ * @param job The job
* @return true if the job could be updated.
*/
- public boolean reschedule(final JobHandler info) {
+ public boolean reschedule(final JobImpl job) {
ResourceResolver resolver = null;
try {
resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
if ( jobResource != null ) {
final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, info.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+ mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
try {
resolver.commit();
@@ -1109,36 +1132,6 @@ public class JobManagerImpl
}
/**
- * Remove the job.
- * @param info
- * @return
- */
- public boolean remove(final JobImpl job) {
- ResourceResolver resolver = null;
- try {
- resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, job, null);
- try {
- resolver.delete(jobResource);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- // ignore
- }
- }
- } catch ( final LoginException ignore ) {
- // ignore
- } finally {
- if ( resolver != null ) {
- resolver.close();
- }
- }
-
- return true;
- }
-
- /**
* Try to start the job
*/
public boolean start(final JobHandler info) {
@@ -1245,7 +1238,6 @@ public class JobManagerImpl
* @param jobProperties The optional job properties
* @return The persisted job or <code>null</code>.
*/
-
private Job addJobInteral(final String jobTopic, final String jobName, final Map<String, Object> jobProperties) {
final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic);
if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP ) {
@@ -1365,24 +1357,23 @@ public class JobManagerImpl
return new JobImpl(jobTopic, jobName, jobId, properties);
}
- public void reassign(final JobHandler handler) {
- final JobImpl job = handler.getJob();
+ public void reassign(final JobImpl job) {
final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// Sanity check if queue configuration has changed
if ( config.getType() == QueueConfiguration.Type.DROP ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+ logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
}
- handler.remove();
+ this.finishJob(job, JobState.CANCELLED, false); // DROP means complete removal
} else {
String targetId = null;
if ( config.getType() != QueueConfiguration.Type.IGNORE ) {
final TopologyCapabilities caps = this.topologyCapabilities;
targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
}
- this.maintenanceTask.reassignJob(handler.getJob(), targetId);
+ this.maintenanceTask.reassignJob(job, targetId);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Mon Sep 30 12:12:58 2013
@@ -30,6 +30,7 @@ public abstract class ConfigurationConst
public static final int DEFAULT_RETRIES = 10;
public static final long DEFAULT_RETRY_DELAY = 2000;
public static final int DEFAULT_MAX_PARALLEL = 15;
+ public static final boolean DEFAULT_KEEP_JOBS = false;
public static final String PROP_NAME = "queue.name";
public static final String PROP_TYPE = "queue.type";
@@ -38,4 +39,5 @@ public abstract class ConfigurationConst
public static final String PROP_RETRIES = "queue.retries";
public static final String PROP_RETRY_DELAY = "queue.retrydelay";
public static final String PROP_PRIORITY = "queue.priority";
+ public static final String PROP_KEEP_JOBS = "queue.keepJobs";
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Sep 30 12:12:58 2013
@@ -62,6 +62,8 @@ import org.osgi.framework.Constants;
options={@PropertyOption(name="NORM",value="Norm"),
@PropertyOption(name="MIN",value="Min"),
@PropertyOption(name="MAX",value="Max")}),
+ @Property(name=ConfigurationConstants.PROP_KEEP_JOBS,
+ boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS),
@Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
label="%queue.ranking.name", description="%queue.ranking.description")
})
@@ -95,6 +97,9 @@ public class InternalQueueConfiguration
/** The configured topics. */
private String[] topics;
+ /** Keep jobs. */
+ private boolean keepJobs;
+
/** Valid flag. */
private boolean valid = false;
@@ -132,6 +137,7 @@ public class InternalQueueConfiguration
} else {
this.topics = topicsParam;
}
+ this.keepJobs = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_KEEP_JOBS), ConfigurationConstants.DEFAULT_KEEP_JOBS);
this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
this.pid = (String)params.get(Constants.SERVICE_PID);
this.valid = this.checkIsValid();
@@ -271,6 +277,11 @@ public class InternalQueueConfiguration
}
@Override
+ public boolean isKeepJobs() {
+ return this.keepJobs;
+ }
+
+ @Override
public String toString() {
return "Queue-Configuration(" + this.hashCode() + ") : {" +
"name=" + this.name +
@@ -278,7 +289,8 @@ public class InternalQueueConfiguration
", topics=" + (this.matchers == null ? "[]" : Arrays.toString(this.matchers)) +
", maxParallelProcesses=" + this.maxParallelProcesses +
", retries=" + this.retries +
- ", retryDelayInMs= " + this.retryDelay +
+ ", retryDelayInMs=" + this.retryDelay +
+ ", keepJobs=" + this.keepJobs +
", serviceRanking=" + this.serviceRanking +
", pid=" + this.pid +
", isValid=" + this.isValid() + "}";
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Sep 30 12:12:58 2013
@@ -392,7 +392,9 @@ public abstract class AbstractJobQueue
final boolean finishSuccessful;
if ( !rescheduleInfo.reschedule ) {
- handler.finished(result.getState());
+ // we keep cancelled jobs and succeeded jobs if the queue is configured like this.
+ final boolean keepJobs = result.getState() != JobState.SUCCEEDED || this.configuration.isKeepJobs();
+ handler.finished(result.getState(), keepJobs);
finishSuccessful = true;
if ( result.getState() == JobState.SUCCEEDED ) {
Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
@@ -760,10 +762,11 @@ public abstract class AbstractJobQueue
@Override
public void run() {
for(final JobHandler job : events) {
- job.remove();
+ job.cancel();
+ Utility.sendNotification(eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, job.getJob(), null);
}
}
- }, "Queue RemoveAll Thread for " + this.queueName);
+ }, "Apache Sling Queue RemoveAll Thread for " + this.queueName);
t.setDaemon(true);
t.start();
// start queue again
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Mon Sep 30 12:12:58 2013
@@ -184,7 +184,7 @@ public final class OrderedJobQueue exten
this.sleepLock.jobHandler = null;
if ( result == null ) {
- handler.remove();
+ handler.cancel();
}
return result;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java Mon Sep 30 12:12:58 2013
@@ -34,7 +34,7 @@ import aQute.bnd.annotation.ProviderType
* In general all scalar types and all serializable classes are supported as
* property types. However, in order for deseralizing classes these must be
* exported. Serializable classes are not searchable in the query either.
- * Due to the above to potential problems, it is advisable to not use
+ * Due to the above mentioned potential problems, it is advisable to not use
* custom classes as job properties, but rather use out of the box supported
* types in combination with collections.
*
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java Mon Sep 30 12:12:58 2013
@@ -58,12 +58,15 @@ public interface JobManager {
/**
* The requested job types for the query.
- * This can either be all jobs, all activated (started) or all queued jobs.
+ * This can either be all (unfinished) jobs, all activated (started) or all queued jobs.
*/
enum QueryType {
- ALL,
+ ALL, // all means all active and all queued
ACTIVE,
- QUEUED
+ QUEUED,
+ HISTORY, // returns the complete history of cancelled and succeeded jobs (if available)
+ CANCELLED,// history of cancelled jobs
+ SUCCEEDED // history of succeeded jobs
}
/**
@@ -77,6 +80,8 @@ public interface JobManager {
* Add a new job
*
* If the topic is <code>null</code> or illegal, no job is created and <code>null</code> is returned.
+ * If properties are provided, all of them must be serializable. If there are non serializable
+ * objects in the properties, no job is created and <code>null</code> is returned.
* A job topic is a hierarchical name separated by dashes, each part has to start with a letter,
* allowed characters are letters, numbers and the underscore.
*
@@ -91,6 +96,8 @@ public interface JobManager {
* Add a new job
*
* If the topic is <code>null</code> or illegal, no job is created and <code>null</code> is returned.
+ * If properties are provided, all of them must be serializable. If there are non serializable
+ * objects in the properties, no job is created and <code>null</code> is returned.
* A job topic is a hierarchical name separated by dashes, each part has to start with a letter,
* allowed characters are letters, numbers and the underscore.
*
@@ -147,9 +154,16 @@ public interface JobManager {
Job getJob(String topic, Map<String, Object> template);
/**
- * Return all jobs either running or scheduled.
+ * Return all jobs of a given type.
*
- * @param type Required parameter for the type: either all jobs, only queued or only started can be returned.
+ * Based on the type parameter, either the history of jobs can be returned or unfinished jobs. The type
+ * parameter can further specify which category of jobs should be returned: for the history either
+ * succeeded jobs, cancelled jobs or both in combination can be returned. For unfinished jobs, either
+ * queued jobs, started jobs or the combination can be returned.
+ * If the history is returned, the result set is sorted in descending order, listening the newest entry
+ * first. For unfinished jobs, the result set is sorted in ascending order.
+ *
+ * @param type Required parameter for the type. See above.
* @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
* @param limit A positive number indicating the maximum number of jobs returned by the iterator. A value
* of zero or less indicates that all jobs should be returned.
@@ -176,7 +190,7 @@ public interface JobManager {
* must match the template (AND query). By providing several maps, different filters
* are possible (OR query).
* @return A non null collection.
- * @deprecated
+ * @deprecated Use {@link #findJobs(QueryType, String, long, Map...)}
*/
@Deprecated
JobsIterator queryJobs(QueryType type, String topic, Map<String, Object>... templates);
@@ -192,7 +206,7 @@ public interface JobManager {
* are possible (OR query).
* @return A non null collection.
* @since 1.1
- * @deprecated
+ * @deprecated Use {@link #findJobs(QueryType, String, long, Map...)}
*/
@Deprecated
JobsIterator queryJobs(QueryType type, String topic, long limit, Map<String, Object>... templates);
@@ -206,7 +220,7 @@ public interface JobManager {
* @param template The map acts like a template. The searched job
* must match the template (AND query).
* @return An event or <code>null</code>
- * @deprecated
+ * @deprecated Use {@link #getJob(String, Map)}
*/
@Deprecated
Event findJob(String topic, Map<String, Object> template);
@@ -217,7 +231,7 @@ public interface JobManager {
* @param jobId The unique identifier as found in the property {@link JobUtil#JOB_ID}.
* @return <code>true</code> if the job could be cancelled or does not exist anymore.
* <code>false</code> otherwise.
- * @deprecated
+ * @deprecated Use {@link #removeJobById(String)}
*/
@Deprecated
boolean removeJob(String jobId);
@@ -228,7 +242,7 @@ public interface JobManager {
* for a job to finish. The job will be removed when this method returns - however
* this method blocks until the job is finished!
* @param jobId The unique identifier as found in the property {@link JobUtil#JOB_ID}.
- * @deprecated
+ * @deprecated Use {@link #removeJobById(String)}
*/
@Deprecated
void forceRemoveJob(String jobId);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java Mon Sep 30 12:12:58 2013
@@ -21,6 +21,8 @@ package org.apache.sling.event.jobs;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
+import aQute.bnd.annotation.ConsumerType;
+
/**
* A job processor processes a job in the background.
* It is used by {@link JobUtil#processJob(Event, JobProcessor)}.
@@ -28,6 +30,7 @@ import org.osgi.service.event.Event;
* @deprecated - Use the new {@link JobConsumer} interface instead.
*/
@Deprecated
+@ConsumerType
public interface JobProcessor {
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java Mon Sep 30 12:12:58 2013
@@ -22,6 +22,8 @@ import java.util.Iterator;
import org.osgi.service.event.Event;
+import aQute.bnd.annotation.ProviderType;
+
/**
* This <code>Iterator</code> allows to iterate over {@link Event}s.
* In addition to an iterator it might return the number of elements
@@ -30,6 +32,7 @@ import org.osgi.service.event.Event;
* @deprecated - Use the new {@link JobManager#findJobs} methods instead.
*/
@Deprecated
+@ProviderType
public interface JobsIterator extends Iterator<Event>, Iterable<Event> {
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java Mon Sep 30 12:12:58 2013
@@ -18,11 +18,14 @@
*/
package org.apache.sling.event.jobs;
+import aQute.bnd.annotation.ProviderType;
+
/**
* This is a job queue processing job events.
* @since 3.0
*/
+@ProviderType
public interface Queue {
/**
@@ -75,7 +78,7 @@ public interface Queue {
/**
* Remove all outstanding jobs and delete them. This actually cancels
- * all outstanding jobs (but no notifications are send).
+ * all outstanding jobs.
*/
void removeAll();
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Mon Sep 30 12:12:58 2013
@@ -18,11 +18,14 @@
*/
package org.apache.sling.event.jobs;
+import aQute.bnd.annotation.ProviderType;
+
/**
* The configuration of a queue.
* @since 3.0
*/
+@ProviderType
public interface QueueConfiguration {
/** The queue type. */
@@ -77,6 +80,13 @@ public interface QueueConfiguration {
String[] getTopics();
/**
+ * Whether successful jobs are kept for a complete history
+ * @return <code>true</code> if successful jobs are kept.
+ * @since 1.3
+ */
+ boolean isKeepJobs();
+
+ /**
* Get the ranking of this configuration.
*/
int getRanking();
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java Mon Sep 30 12:12:58 2013
@@ -18,12 +18,15 @@
*/
package org.apache.sling.event.jobs;
+import aQute.bnd.annotation.ProviderType;
+
/**
* Statistic information.
* This information is not preserved between restarts of the service.
* Once a service is restarted, the counters start at zero!
* @since 3.0
*/
+@ProviderType
public interface Statistics {
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java Mon Sep 30 12:12:58 2013
@@ -18,12 +18,15 @@
*/
package org.apache.sling.event.jobs;
+import aQute.bnd.annotation.ProviderType;
+
/**
* Statistic information about a topic.
* This information is not preserved between restarts of the service.
* Once a service is restarted, the counters start at zero!
* @since 3.0
*/
+@ProviderType
public interface TopicStatistics {
/**
Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Mon Sep 30 12:12:58 2013
@@ -61,14 +61,14 @@ queue.maxparallel.name = Maximum Paralle
queue.maxparallel.description = The maximum number of parallel jobs started for this queue. \
A value of -1 is substituted with the number of available processors.
+queue.keepJobs.name = Keep Jobs
+queue.keepJobs.description = If this option is enabled, successful finished jobs are kept \
+ to provide a complete history.
+
queue.ranking.name = Ranking
queue.ranking.description = Integer value defining the ranking of this queue configuration. \
If more than one queue matches a job topic, the one with the highest ranking is used.
-queue.waitforasync.name = Wait For Async
-queue.waitforasync.description = If a job consumer is processing a job asynchronously, this \
- flag controls whether the queue waits for the consumer to finish before starting new jobs.
-
#
# Job Event Handler
job.events.name = Apache Sling Job Default Queue
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1527530&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java Mon Sep 30 12:12:58 2013
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class HistoryTest extends AbstractJobHandlingTest {
+
+ private static final String TOPIC = "sling/test/history";
+
+ private static final String PROP_COUNTER = "counter";
+
+ private String queueConfPid;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create test queue - we use an ordered queue to have a stable processing order
+ // keep the jobs in the history
+ final org.osgi.service.cm.Configuration config = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(ConfigurationConstants.PROP_NAME, "test");
+ props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+ props.put(ConfigurationConstants.PROP_TOPICS, new String[] {TOPIC});
+ props.put(ConfigurationConstants.PROP_RETRIES, 2);
+ props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2L);
+ props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
+ config.update(props);
+
+ this.queueConfPid = config.getPid();
+ this.sleep(1000L);
+ }
+
+ @After
+ public void cleanUp() throws IOException {
+ this.removeConfiguration(this.queueConfPid);
+ }
+
+ private Job addJob(final long counter) {
+ final Map<String, Object> props = new HashMap<String, Object>();
+ props.put(PROP_COUNTER, counter);
+ return this.getJobManager().addJob(TOPIC, props );
+ }
+
+ /**
+ * Test history.
+ * Start 10 jobs and cancel some of them and succeed others
+ */
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testHistory() throws Exception {
+ final ServiceRegistration reg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public JobResult process(final Job job) {
+ sleep(5L);
+ final long count = job.getProperty(PROP_COUNTER, Long.class);
+ if ( count == 2 || count == 5 || count == 7 ) {
+ return JobResult.CANCEL;
+ }
+ return JobResult.OK;
+ }
+
+ });
+ Collection<Job> col = null;
+ try {
+ for(int i = 0; i< 10; i++) {
+ this.addJob(i);
+ }
+ this.sleep(200L);
+ while ( this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1, (Map<String, Object>[])null).size() < 10 ) {
+ this.sleep(20L);
+ }
+ col = this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1, (Map<String, Object>[])null);
+ assertEquals(10, col.size());
+ assertEquals(3, this.getJobManager().findJobs(JobManager.QueryType.CANCELLED, TOPIC, -1, (Map<String, Object>[])null).size());
+ assertEquals(7, this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, TOPIC, -1, (Map<String, Object>[])null).size());
+ // verify order
+ long last = 9;
+ for(final Job j : col) {
+ final long count = j.getProperty(PROP_COUNTER, Long.class);
+ assertEquals(last, count);
+ last--;
+ }
+ } finally {
+ if ( col != null ) {
+ for(final Job j : col) {
+ this.getJobManager().removeJobById(j.getId());
+ }
+ }
+ reg.unregister();
+ }
+ }
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Mon Sep 30 12:12:58 2013
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -242,6 +243,14 @@ public class JobHandlingTest extends Abs
assertNotNull(e2);
assertTrue(jobManager.removeJob((String)e2.getProperty(JobUtil.JOB_ID)));
assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
+ final Collection<Job> col = jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1, (Map<String, Object>[])null);
+ try {
+ assertEquals(1, col.size());
+ } finally {
+ for(final Job j : col) {
+ jobManager.removeJobById(j.getId());
+ }
+ }
} finally {
jcReg.unregister();
}
@@ -276,6 +285,14 @@ public class JobHandlingTest extends Abs
jobManager.forceRemoveJob((String)e.getProperty(JobUtil.JOB_ID));
// the job is now removed
assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
+ final Collection<Job> col = jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1, (Map<String, Object>[])null);
+ try {
+ assertEquals(1, col.size());
+ } finally {
+ for(final Job j : col) {
+ jobManager.removeJobById(j.getId());
+ }
+ }
} finally {
jcReg.unregister();
}
@@ -309,7 +326,7 @@ public class JobHandlingTest extends Abs
});
try {
final JobManager jobManager = this.getJobManager();
- jobManager.addJob(TOPIC, null, null);
+ final Job job = jobManager.addJob(TOPIC, null, null);
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
@@ -322,6 +339,8 @@ public class JobHandlingTest extends Abs
cb.reset();
assertFalse("Unexpected event received in the given time.", cb.block(5));
assertEquals("Unexpected number of retries", 3, retryCountList.size());
+
+ jobManager.removeJobById(job.getId());
} finally {
jcReg.unregister();
}
@@ -417,8 +436,8 @@ public class JobHandlingTest extends Abs
}
});
+ final JobManager jobManager = this.getJobManager();
try {
- final JobManager jobManager = this.getJobManager();
jobManager.addJob(TOPIC, "1", null);
jobManager.addJob(TOPIC, "2", null);
jobManager.addJob(TOPIC, "3", null);
@@ -439,6 +458,10 @@ public class JobHandlingTest extends Abs
assertEquals("Started count", 10, started.size());
assertEquals("Failed count", 5, failed.size());
} finally {
+ final Collection<Job> col = jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1, (Map<String, Object>[])null);
+ for(final Job j : col) {
+ jobManager.removeJobById(j.getId());
+ }
jcReg.unregister();
eh1Reg.unregister();
eh2Reg.unregister();