You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/09/30 14:12:59 UTC

svn commit: r1527530 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/impl/jobs/ src/main/java/org/apache/sling/event/impl/jobs/config/ src/main/java/org/apache/sling/event/impl/jobs/queues/ src/main/java/org/apache/s...

Author: cziegeler
Date: Mon Sep 30 12:12:58 2013
New Revision: 1527530

URL: http://svn.apache.org/r1527530
Log:
SLING-3028 :  Support for progress tracking of jobs 

Added:
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Mon Sep 30 12:12:58 2013
@@ -47,7 +47,7 @@
         <sling.java.version>6</sling.java.version>
         <exam.version>3.0.3</exam.version>
         <url.version>1.5.2</url.version>
-         <bundle.build.name>${basedir}/target</bundle.build.name>
+        <bundle.build.name>${basedir}/target</bundle.build.name>
         <bundle.file.name>${bundle.build.name}/${project.build.finalName}.jar</bundle.file.name>
     </properties>
  

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Mon Sep 30 12:12:58 2013
@@ -51,9 +51,9 @@ public class JobHandler {
      * Finish the processing of the job
      * @param state The state of processing
      */
-    public void finished(final JobState state) {
+    public void finished(final JobState state, final boolean keepJobInHistory) {
         // for now we just keep cancelled jobs
-        this.jobManager.finished(this, state, state != JobState.SUCCEEDED);
+        this.jobManager.finishJob(this.job, state, keepJobInHistory);
     }
 
     /**
@@ -61,15 +61,15 @@ public class JobHandler {
      * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
      */
     public boolean reschedule() {
-        return this.jobManager.reschedule(this);
+        return this.jobManager.reschedule(this.job);
     }
 
-    public boolean remove() {
-        return this.jobManager.remove(this.job);
+    public void cancel() {
+        this.jobManager.finishJob(this.job, JobState.CANCELLED, true);
     }
 
     public void reassign() {
-        this.jobManager.reassign(this);
+        this.jobManager.reassign(this.job);
     }
 
     @Override

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Mon Sep 30 12:12:58 2013
@@ -62,7 +62,10 @@ public class JobImpl implements Job {
     public static final String PROPERTY_MESSAGE = "slingevent:message";
 
     /** Property for finished jobs. */
-    public static final String PROPERTY_FINISHED = "slingevent:finished";
+    public static final String PROPERTY_FINISHED_STATE = "slingevent:finishedState";
+
+    /** Property for finished jobs. */
+    public static final String PROPERTY_FINISHED_DATE = "slingevent:finishedDate";
 
     private final ValueMap properties;
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Sep 30 12:12:58 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.event.impl.jobs;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
@@ -296,7 +297,7 @@ public class JobManagerImpl
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
             }
-            this.remove(job);
+            this.finishJob(job, JobState.CANCELLED, false);
         } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
             if ( !reassign ) {
                 if ( logger.isDebugEnabled() ) {
@@ -332,7 +333,7 @@ public class JobManagerImpl
                         if ( queue == null ) {
                             // this is just a sanity check, actually we can never get here
                             logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
-                            this.remove(job);
+                            this.finishJob(job, JobState.CANCELLED, false);
                         } else {
                             queues.put(queueInfo.queueName, queue);
                             ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
@@ -729,39 +730,51 @@ public class JobManagerImpl
      */
     @Override
     public boolean removeJob(final String jobId) {
+        return this.internalRemoveJobJobById(jobId, false);
+    }
+
+    private boolean internalRemoveJobJobById(final String jobId, final boolean forceRemove) {
         logger.debug("Trying to remove job {}", jobId);
         boolean result = true;
-        final Job job = this.getJobById(jobId);
-        logger.debug("Found removal job: {}", job);
+        final JobImpl job = (JobImpl)this.getJobById(jobId);
         if ( job != null ) {
+            logger.debug("Found removal job: {}", job);
             // currently running?
-            if ( job.getProcessingStarted() != null ) {
+            if ( !forceRemove && job.getProcessingStarted() != null ) {
                 logger.debug("Unable to remove job - job is started: {}", job);
                 result = false;
             } else {
-                ResourceResolver resolver = null;
-                try {
-                    resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-                    final Resource jobResource = resolver.getResource(((JobImpl)job).getResourcePath());
-                    if ( jobResource != null ) {
-                        resolver.delete(jobResource);
-                        resolver.commit();
-                        logger.debug("Removed job with id: {}", jobId);
-                    } else {
-                        logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
-                    }
-                } catch ( final LoginException le ) {
-                    this.ignoreException(le);
-                    result = false;
-                } catch ( final PersistenceException pe) {
-                    this.ignoreException(pe);
-                    result = false;
-                } finally {
-                    if ( resolver != null ) {
-                        resolver.close();
+                final boolean isHistoryJob = this.configuration.isStoragePath(job.getResourcePath());
+                // if history job, simply remove - otherwise move to history!
+                if ( isHistoryJob ) {
+                    ResourceResolver resolver = null;
+                    try {
+                        resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                        final Resource jobResource = resolver.getResource(job.getResourcePath());
+                        if ( jobResource != null ) {
+                            resolver.delete(jobResource);
+                            resolver.commit();
+                            logger.debug("Removed job with id: {}", jobId);
+                        } else {
+                            logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
+                        }
+                    } catch ( final LoginException le ) {
+                        this.ignoreException(le);
+                        result = false;
+                    } catch ( final PersistenceException pe) {
+                        this.ignoreException(pe);
+                        result = false;
+                    } finally {
+                        if ( resolver != null ) {
+                            resolver.close();
+                        }
                     }
+                } else {
+                    this.finishJob(job, JobState.CANCELLED, true);
                 }
             }
+        } else {
+            logger.debug("Job for removal does not exist (anymore): {}", jobId);
         }
         return result;
     }
@@ -771,7 +784,7 @@ public class JobManagerImpl
      */
     @Override
     public void forceRemoveJob(final String jobId) {
-        this.removeJobById(jobId);
+        this.internalRemoveJobJobById(jobId, true);
     }
 
     /**
@@ -792,6 +805,15 @@ public class JobManagerImpl
             logger.warn("{}", errorMessage);
             return null;
         }
+        if ( properties != null ) {
+            for(final Object val : properties.values()) {
+                if ( val != null && !(val instanceof Serializable) ) {
+                    logger.warn("Discarding job - properties must be serializable: {} {} : {}",
+                            new Object[] {topic, name, properties});
+                    return null;
+                }
+            }
+        }
         Job result = this.addJobInteral(topic, name, properties);
         if ( result == null && name != null ) {
             result = this.getJobByName(name);
@@ -908,31 +930,8 @@ public class JobManagerImpl
      * @see org.apache.sling.event.jobs.JobManager#removeJobById(java.lang.String)
      */
     @Override
-    public boolean removeJobById(String jobId) {
-        boolean result = true;
-        final Job job = this.getJobById(jobId);
-        if ( job != null ) {
-            ResourceResolver resolver = null;
-            try {
-                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-                final Resource jobResource = resolver.getResource(((JobImpl)job).getResourcePath());
-                if ( jobResource != null ) {
-                    resolver.delete(jobResource);
-                    resolver.commit();
-                }
-            } catch ( final LoginException le ) {
-                this.ignoreException(le);
-                result = false;
-            } catch ( final PersistenceException pe) {
-                this.ignoreException(pe);
-                result = false;
-            } finally {
-                if ( resolver != null ) {
-                    resolver.close();
-                }
-            }
-        }
-        return result;
+    public boolean removeJobById(final String jobId) {
+        return this.internalRemoveJobJobById(jobId, true);
     }
 
     /**
@@ -943,6 +942,7 @@ public class JobManagerImpl
             final String topic,
             final long limit,
             final Map<String, Object>... templates) {
+        final boolean isHistoryQuery = type == QueryType.HISTORY || type == QueryType.SUCCEEDED || type == QueryType.CANCELLED;
         final List<Job> result = new ArrayList<Job>();
         ResourceResolver resolver = null;
         try {
@@ -957,17 +957,34 @@ public class JobManagerImpl
             buf.append(" = '");
             buf.append(topic);
             buf.append("'");
-            if ( type == QueryType.ACTIVE ) {
+
+            // restricting on the type - history or unfinished
+            if ( isHistoryQuery ) {
                 buf.append(" and @");
-                buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
-            } else if ( type == QueryType.QUEUED ) {
+                buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
+                if ( type == QueryType.SUCCEEDED ) {
+                    buf.append(" = '");
+                    buf.append(JobState.SUCCEEDED.name());
+                    buf.append("'");
+                } else if ( type == QueryType.CANCELLED ) {
+                    buf.append(" = '");
+                    buf.append(JobState.CANCELLED.name());
+                    buf.append("'");
+                }
+            } else {
                 buf.append(" and not(@");
-                buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+                buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
                 buf.append(")");
+                if ( type == QueryType.ACTIVE ) {
+                    buf.append(" and @");
+                    buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+                } else if ( type == QueryType.QUEUED ) {
+                    buf.append(" and not(@");
+                    buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+                    buf.append(")");
+                }
             }
-            buf.append(" and not(@");
-            buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED));
-            buf.append(")");
+
             if ( templates != null && templates.length > 0 ) {
                 buf.append(" and (");
                 int index = 0;
@@ -998,8 +1015,13 @@ public class JobManagerImpl
                 buf.append(')');
             }
             buf.append("] order by @");
-            buf.append(Job.PROPERTY_JOB_CREATED);
-            buf.append(" ascending");
+            if ( isHistoryQuery ) {
+                buf.append(JobImpl.PROPERTY_FINISHED_DATE);
+                buf.append(" descending");
+            } else {
+                buf.append(Job.PROPERTY_JOB_CREATED);
+                buf.append(" ascending");
+            }
             final Iterator<Resource> iter = resolver.findResources(buf.toString(), "xpath");
             long count = 0;
 
@@ -1031,20 +1053,21 @@ public class JobManagerImpl
      * @param info  The job handler
      * @param state The state of the processing
      */
-    public void finished(final JobHandler handler, final JobState state, final boolean keepJobInHistory) {
+    public void finishJob(final JobImpl job, final JobState state, final boolean keepJobInHistory) {
         final boolean isSuccess = (state == JobState.SUCCEEDED);
         ResourceResolver resolver = null;
         try {
             resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-            final Resource jobResource = resolver.getResource(handler.getJob().getResourcePath());
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
             if ( jobResource != null ) {
                 try {
                     String newPath = null;
                     if ( keepJobInHistory ) {
                         final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-                        newPath = this.configuration.getStoragePath(handler.getJob(), isSuccess);
+                        newPath = this.configuration.getStoragePath(job, isSuccess);
                         final Map<String, Object> props = new HashMap<String, Object>(vm);
-                        props.put(JobImpl.PROPERTY_FINISHED, isSuccess);
+                        props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ? JobState.SUCCEEDED.name() : JobState.CANCELLED.name());
+                        props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
 
                         ResourceHelper.getOrCreateResource(resolver, newPath, props);
                     }
@@ -1053,9 +1076,9 @@ public class JobManagerImpl
 
                     if ( keepJobInHistory && logger.isDebugEnabled() ) {
                         if ( isSuccess ) {
-                            logger.debug("Kept successful job {} at {}", Utility.toString(handler.getJob()), newPath);
+                            logger.debug("Kept successful job {} at {}", Utility.toString(job), newPath);
                         } else {
-                            logger.debug("Moved cancelled job {} to {}", Utility.toString(handler.getJob()), newPath);
+                            logger.debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
                         }
                     }
                 } catch ( final PersistenceException pe ) {
@@ -1078,17 +1101,17 @@ public class JobManagerImpl
      * Reschedule a job.
      *
      * Update the retry count and remove the started time.
-     * @param info The job info
+     * @param job The job
      * @return true if the job could be updated.
      */
-    public boolean reschedule(final JobHandler info) {
+    public boolean reschedule(final JobImpl job) {
         ResourceResolver resolver = null;
         try {
             resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-            final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
             if ( jobResource != null ) {
                 final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, info.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+                mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
                 mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
                 try {
                     resolver.commit();
@@ -1109,36 +1132,6 @@ public class JobManagerImpl
     }
 
     /**
-     * Remove the job.
-     * @param info
-     * @return
-     */
-    public boolean remove(final JobImpl job) {
-        ResourceResolver resolver = null;
-        try {
-            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, job, null);
-                try {
-                    resolver.delete(jobResource);
-                    resolver.commit();
-                } catch ( final PersistenceException pe ) {
-                    // ignore
-                }
-            }
-        } catch ( final LoginException ignore ) {
-            // ignore
-        } finally {
-            if ( resolver != null ) {
-                resolver.close();
-            }
-        }
-
-        return true;
-    }
-
-    /**
      * Try to start the job
      */
     public boolean start(final JobHandler info) {
@@ -1245,7 +1238,6 @@ public class JobManagerImpl
      * @param jobProperties The optional job properties
      * @return The persisted job or <code>null</code>.
      */
-
     private Job addJobInteral(final String jobTopic, final String jobName, final Map<String, Object> jobProperties) {
         final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic);
         if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP ) {
@@ -1365,24 +1357,23 @@ public class JobManagerImpl
         return new JobImpl(jobTopic, jobName, jobId, properties);
     }
 
-    public void reassign(final JobHandler handler) {
-        final JobImpl job = handler.getJob();
+    public void reassign(final JobImpl job) {
         final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
 
         // Sanity check if queue configuration has changed
         if ( config.getType() == QueueConfiguration.Type.DROP ) {
             if ( logger.isDebugEnabled() ) {
-                logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+                logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
             }
-            handler.remove();
+            this.finishJob(job, JobState.CANCELLED, false); // DROP means complete removal
         } else {
             String targetId = null;
             if ( config.getType() != QueueConfiguration.Type.IGNORE ) {
                 final TopologyCapabilities caps = this.topologyCapabilities;
                 targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
             }
-            this.maintenanceTask.reassignJob(handler.getJob(), targetId);
+            this.maintenanceTask.reassignJob(job, targetId);
         }
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Mon Sep 30 12:12:58 2013
@@ -30,6 +30,7 @@ public abstract class ConfigurationConst
     public static final int DEFAULT_RETRIES = 10;
     public static final long DEFAULT_RETRY_DELAY = 2000;
     public static final int DEFAULT_MAX_PARALLEL = 15;
+    public static final boolean DEFAULT_KEEP_JOBS = false;
 
     public static final String PROP_NAME = "queue.name";
     public static final String PROP_TYPE = "queue.type";
@@ -38,4 +39,5 @@ public abstract class ConfigurationConst
     public static final String PROP_RETRIES = "queue.retries";
     public static final String PROP_RETRY_DELAY = "queue.retrydelay";
     public static final String PROP_PRIORITY = "queue.priority";
+    public static final String PROP_KEEP_JOBS = "queue.keepJobs";
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Sep 30 12:12:58 2013
@@ -62,6 +62,8 @@ import org.osgi.framework.Constants;
             options={@PropertyOption(name="NORM",value="Norm"),
                      @PropertyOption(name="MIN",value="Min"),
                      @PropertyOption(name="MAX",value="Max")}),
+    @Property(name=ConfigurationConstants.PROP_KEEP_JOBS,
+              boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS),
     @Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
               label="%queue.ranking.name", description="%queue.ranking.description")
 })
@@ -95,6 +97,9 @@ public class InternalQueueConfiguration
     /** The configured topics. */
     private String[] topics;
 
+    /** Keep jobs. */
+    private boolean keepJobs;
+
     /** Valid flag. */
     private boolean valid = false;
 
@@ -132,6 +137,7 @@ public class InternalQueueConfiguration
         } else {
             this.topics = topicsParam;
         }
+        this.keepJobs = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_KEEP_JOBS), ConfigurationConstants.DEFAULT_KEEP_JOBS);
         this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
         this.pid = (String)params.get(Constants.SERVICE_PID);
         this.valid = this.checkIsValid();
@@ -271,6 +277,11 @@ public class InternalQueueConfiguration
     }
 
     @Override
+    public boolean isKeepJobs() {
+        return this.keepJobs;
+    }
+
+    @Override
     public String toString() {
         return "Queue-Configuration(" + this.hashCode() + ") : {" +
             "name=" + this.name +
@@ -278,7 +289,8 @@ public class InternalQueueConfiguration
             ", topics=" + (this.matchers == null ? "[]" : Arrays.toString(this.matchers)) +
             ", maxParallelProcesses=" + this.maxParallelProcesses +
             ", retries=" + this.retries +
-            ", retryDelayInMs= " + this.retryDelay +
+            ", retryDelayInMs=" + this.retryDelay +
+            ", keepJobs=" + this.keepJobs +
             ", serviceRanking=" + this.serviceRanking +
             ", pid=" + this.pid +
             ", isValid=" + this.isValid() + "}";

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Sep 30 12:12:58 2013
@@ -392,7 +392,9 @@ public abstract class AbstractJobQueue
         final boolean finishSuccessful;
 
         if ( !rescheduleInfo.reschedule ) {
-            handler.finished(result.getState());
+            // we keep cancelled jobs and succeeded jobs if the queue is configured like this.
+            final boolean keepJobs = result.getState() != JobState.SUCCEEDED || this.configuration.isKeepJobs();
+            handler.finished(result.getState(), keepJobs);
             finishSuccessful = true;
             if ( result.getState() == JobState.SUCCEEDED ) {
                 Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
@@ -760,10 +762,11 @@ public abstract class AbstractJobQueue
                 @Override
                 public void run() {
                     for(final JobHandler job : events) {
-                        job.remove();
+                        job.cancel();
+                        Utility.sendNotification(eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, job.getJob(), null);
                     }
                 }
-            }, "Queue RemoveAll Thread for " + this.queueName);
+            }, "Apache Sling Queue RemoveAll Thread for " + this.queueName);
         t.setDaemon(true);
         t.start();
         // start queue again

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Mon Sep 30 12:12:58 2013
@@ -184,7 +184,7 @@ public final class OrderedJobQueue exten
                 this.sleepLock.jobHandler = null;
 
                 if ( result == null ) {
-                    handler.remove();
+                    handler.cancel();
                 }
                 return result;
             }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java Mon Sep 30 12:12:58 2013
@@ -34,7 +34,7 @@ import aQute.bnd.annotation.ProviderType
  * In general all scalar types and all serializable classes are supported as
  * property types. However, in order for deseralizing classes these must be
  * exported. Serializable classes are not searchable in the query either.
- * Due to the above to potential problems, it is advisable to not use
+ * Due to the above mentioned potential problems, it is advisable to not use
  * custom classes as job properties, but rather use out of the box supported
  * types in combination with collections.
  *

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java Mon Sep 30 12:12:58 2013
@@ -58,12 +58,15 @@ public interface JobManager {
 
     /**
      * The requested job types for the query.
-     * This can either be all jobs, all activated (started) or all queued jobs.
+     * This can either be all (unfinished) jobs, all activated (started) or all queued jobs.
      */
     enum QueryType {
-        ALL,
+        ALL,      // all means all active and all queued
         ACTIVE,
-        QUEUED
+        QUEUED,
+        HISTORY,  // returns the complete history of cancelled and succeeded jobs (if available)
+        CANCELLED,// history of cancelled jobs
+        SUCCEEDED // history of succeeded jobs
     }
 
     /**
@@ -77,6 +80,8 @@ public interface JobManager {
      * Add a new job
      *
      * If the topic is <code>null</code> or illegal, no job is created and <code>null</code> is returned.
+     * If properties are provided, all of them must be serializable. If there are non serializable
+     * objects in the properties, no job is created and <code>null</code> is returned.
      * A job topic is a hierarchical name separated by dashes, each part has to start with a letter,
      * allowed characters are letters, numbers and the underscore.
      *
@@ -91,6 +96,8 @@ public interface JobManager {
      * Add a new job
      *
      * If the topic is <code>null</code> or illegal, no job is created and <code>null</code> is returned.
+     * If properties are provided, all of them must be serializable. If there are non serializable
+     * objects in the properties, no job is created and <code>null</code> is returned.
      * A job topic is a hierarchical name separated by dashes, each part has to start with a letter,
      * allowed characters are letters, numbers and the underscore.
      *
@@ -147,9 +154,16 @@ public interface JobManager {
     Job getJob(String topic, Map<String, Object> template);
 
     /**
-     * Return all jobs either running or scheduled.
+     * Return all jobs of a given type.
      *
-     * @param type Required parameter for the type: either all jobs, only queued or only started can be returned.
+     * Based on the type parameter, either the history of jobs can be returned or unfinished jobs. The type
+     * parameter can further specify which category of jobs should be returned: for the history either
+     * succeeded jobs, cancelled jobs or both in combination can be returned. For unfinished jobs, either
+     * queued jobs, started jobs or the combination can be returned.
+     * If the history is returned, the result set is sorted in descending order, listening the newest entry
+     * first. For unfinished jobs, the result set is sorted in ascending order.
+     *
+     * @param type Required parameter for the type. See above.
      * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
      * @param limit A positive number indicating the maximum number of jobs returned by the iterator. A value
      *              of zero or less indicates that all jobs should be returned.
@@ -176,7 +190,7 @@ public interface JobManager {
      *                    must match the template (AND query). By providing several maps, different filters
      *                    are possible (OR query).
      * @return A non null collection.
-     * @deprecated
+     * @deprecated Use {@link #findJobs(QueryType, String, long, Map...)}
      */
     @Deprecated
     JobsIterator queryJobs(QueryType type, String topic, Map<String, Object>... templates);
@@ -192,7 +206,7 @@ public interface JobManager {
      *                    are possible (OR query).
      * @return A non null collection.
      * @since 1.1
-     * @deprecated
+     * @deprecated Use {@link #findJobs(QueryType, String, long, Map...)}
      */
     @Deprecated
     JobsIterator queryJobs(QueryType type, String topic, long limit, Map<String, Object>... templates);
@@ -206,7 +220,7 @@ public interface JobManager {
      * @param template The map acts like a template. The searched job
      *                    must match the template (AND query).
      * @return An event or <code>null</code>
-     * @deprecated
+     * @deprecated Use {@link #getJob(String, Map)}
      */
     @Deprecated
     Event findJob(String topic, Map<String, Object> template);
@@ -217,7 +231,7 @@ public interface JobManager {
      * @param jobId The unique identifier as found in the property {@link JobUtil#JOB_ID}.
      * @return <code>true</code> if the job could be cancelled or does not exist anymore.
      *         <code>false</code> otherwise.
-     * @deprecated
+     * @deprecated Use {@link #removeJobById(String)}
      */
     @Deprecated
     boolean removeJob(String jobId);
@@ -228,7 +242,7 @@ public interface JobManager {
      * for a job to finish. The job will be removed when this method returns - however
      * this method blocks until the job is finished!
      * @param jobId The unique identifier as found in the property {@link JobUtil#JOB_ID}.
-     * @deprecated
+     * @deprecated Use {@link #removeJobById(String)}
      */
     @Deprecated
     void forceRemoveJob(String jobId);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java Mon Sep 30 12:12:58 2013
@@ -21,6 +21,8 @@ package org.apache.sling.event.jobs;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.osgi.service.event.Event;
 
+import aQute.bnd.annotation.ConsumerType;
+
 /**
  * A job processor processes a job in the background.
  * It is used by {@link JobUtil#processJob(Event, JobProcessor)}.
@@ -28,6 +30,7 @@ import org.osgi.service.event.Event;
  * @deprecated - Use the new {@link JobConsumer} interface instead.
  */
 @Deprecated
+@ConsumerType
 public interface JobProcessor {
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java Mon Sep 30 12:12:58 2013
@@ -22,6 +22,8 @@ import java.util.Iterator;
 
 import org.osgi.service.event.Event;
 
+import aQute.bnd.annotation.ProviderType;
+
 /**
  * This <code>Iterator</code> allows to iterate over {@link Event}s.
  * In addition to an iterator it might return the number of elements
@@ -30,6 +32,7 @@ import org.osgi.service.event.Event;
  * @deprecated - Use the new {@link JobManager#findJobs} methods instead.
  */
 @Deprecated
+@ProviderType
 public interface JobsIterator extends Iterator<Event>, Iterable<Event> {
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java Mon Sep 30 12:12:58 2013
@@ -18,11 +18,14 @@
  */
 package org.apache.sling.event.jobs;
 
+import aQute.bnd.annotation.ProviderType;
+
 
 /**
  * This is a job queue processing job events.
  * @since 3.0
  */
+@ProviderType
 public interface Queue {
 
     /**
@@ -75,7 +78,7 @@ public interface Queue {
 
     /**
      * Remove all outstanding jobs and delete them. This actually cancels
-     * all outstanding jobs (but no notifications are send).
+     * all outstanding jobs.
      */
     void removeAll();
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Mon Sep 30 12:12:58 2013
@@ -18,11 +18,14 @@
  */
 package org.apache.sling.event.jobs;
 
+import aQute.bnd.annotation.ProviderType;
+
 
 /**
  * The configuration of a queue.
  * @since 3.0
  */
+@ProviderType
 public interface QueueConfiguration {
 
     /** The queue type. */
@@ -77,6 +80,13 @@ public interface QueueConfiguration {
     String[] getTopics();
 
     /**
+     * Whether successful jobs are kept for a complete history
+     * @return <code>true</code> if successful jobs are kept.
+     * @since 1.3
+     */
+    boolean isKeepJobs();
+
+    /**
      * Get the ranking of this configuration.
      */
     int getRanking();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Statistics.java Mon Sep 30 12:12:58 2013
@@ -18,12 +18,15 @@
  */
 package org.apache.sling.event.jobs;
 
+import aQute.bnd.annotation.ProviderType;
+
 /**
  * Statistic information.
  * This information is not preserved between restarts of the service.
  * Once a service is restarted, the counters start at zero!
  * @since 3.0
  */
+@ProviderType
 public interface Statistics {
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/TopicStatistics.java Mon Sep 30 12:12:58 2013
@@ -18,12 +18,15 @@
  */
 package org.apache.sling.event.jobs;
 
+import aQute.bnd.annotation.ProviderType;
+
 /**
  * Statistic information about a topic.
  * This information is not preserved between restarts of the service.
  * Once a service is restarted, the counters start at zero!
  * @since 3.0
  */
+@ProviderType
 public interface TopicStatistics {
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Mon Sep 30 12:12:58 2013
@@ -61,14 +61,14 @@ queue.maxparallel.name = Maximum Paralle
 queue.maxparallel.description = The maximum number of parallel jobs started for this queue. \
  A value of -1 is substituted with the number of available processors.
 
+queue.keepJobs.name = Keep Jobs
+queue.keepJobs.description = If this option is enabled, successful finished jobs are kept \
+ to provide a complete history.
+
 queue.ranking.name = Ranking
 queue.ranking.description = Integer value defining the ranking of this queue configuration. \
  If more than one queue matches a job topic, the one with the highest ranking is used.
 
-queue.waitforasync.name = Wait For Async
-queue.waitforasync.description = If a job consumer is processing a job asynchronously, this \
- flag controls whether the queue waits for the consumer to finish before starting new jobs.
-
 #
 # Job Event Handler
 job.events.name = Apache Sling Job Default Queue 

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1527530&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java Mon Sep 30 12:12:58 2013
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class HistoryTest extends AbstractJobHandlingTest {
+
+    private static final String TOPIC = "sling/test/history";
+
+    private static final String PROP_COUNTER = "counter";
+
+    private String queueConfPid;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create test queue - we use an ordered queue to have a stable processing order
+        // keep the jobs in the history
+        final org.osgi.service.cm.Configuration config = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(ConfigurationConstants.PROP_NAME, "test");
+        props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+        props.put(ConfigurationConstants.PROP_TOPICS, new String[] {TOPIC});
+        props.put(ConfigurationConstants.PROP_RETRIES, 2);
+        props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2L);
+        props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
+        config.update(props);
+
+        this.queueConfPid = config.getPid();
+        this.sleep(1000L);
+    }
+
+    @After
+    public void cleanUp() throws IOException {
+        this.removeConfiguration(this.queueConfPid);
+    }
+
+    private Job addJob(final long counter) {
+        final Map<String, Object> props = new HashMap<String, Object>();
+        props.put(PROP_COUNTER, counter);
+        return this.getJobManager().addJob(TOPIC, props );
+    }
+
+    /**
+     * Test history.
+     * Start 10 jobs and cancel some of them and succeed others
+     */
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testHistory() throws Exception {
+        final ServiceRegistration reg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public JobResult process(final Job job) {
+                        sleep(5L);
+                        final long count = job.getProperty(PROP_COUNTER, Long.class);
+                        if ( count == 2 || count == 5 || count == 7 ) {
+                            return JobResult.CANCEL;
+                        }
+                        return JobResult.OK;
+                    }
+
+                });
+        Collection<Job> col = null;
+        try {
+            for(int i = 0; i< 10; i++) {
+                this.addJob(i);
+            }
+            this.sleep(200L);
+            while ( this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1, (Map<String, Object>[])null).size() < 10 ) {
+                this.sleep(20L);
+            }
+            col = this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1, (Map<String, Object>[])null);
+            assertEquals(10, col.size());
+            assertEquals(3, this.getJobManager().findJobs(JobManager.QueryType.CANCELLED, TOPIC, -1, (Map<String, Object>[])null).size());
+            assertEquals(7, this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, TOPIC, -1, (Map<String, Object>[])null).size());
+            // verify order
+            long last = 9;
+            for(final Job j : col) {
+                final long count = j.getProperty(PROP_COUNTER, Long.class);
+                assertEquals(last, count);
+                last--;
+            }
+        } finally {
+            if ( col != null ) {
+                for(final Job j : col) {
+                    this.getJobManager().removeJobById(j.getId());
+                }
+            }
+            reg.unregister();
+        }
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1527530&r1=1527529&r2=1527530&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Mon Sep 30 12:12:58 2013
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -242,6 +243,14 @@ public class JobHandlingTest extends Abs
             assertNotNull(e2);
             assertTrue(jobManager.removeJob((String)e2.getProperty(JobUtil.JOB_ID)));
             assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
+            final Collection<Job> col = jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1, (Map<String, Object>[])null);
+            try {
+                assertEquals(1, col.size());
+            } finally {
+                for(final Job j : col) {
+                    jobManager.removeJobById(j.getId());
+                }
+            }
         } finally {
             jcReg.unregister();
         }
@@ -276,6 +285,14 @@ public class JobHandlingTest extends Abs
             jobManager.forceRemoveJob((String)e.getProperty(JobUtil.JOB_ID));
             // the job is now removed
             assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
+            final Collection<Job> col = jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1, (Map<String, Object>[])null);
+            try {
+                assertEquals(1, col.size());
+            } finally {
+                for(final Job j : col) {
+                    jobManager.removeJobById(j.getId());
+                }
+            }
         } finally {
             jcReg.unregister();
         }
@@ -309,7 +326,7 @@ public class JobHandlingTest extends Abs
                 });
         try {
             final JobManager jobManager = this.getJobManager();
-            jobManager.addJob(TOPIC, null, null);
+            final Job job = jobManager.addJob(TOPIC, null, null);
 
             assertTrue("No event received in the given time.", cb.block(5));
             cb.reset();
@@ -322,6 +339,8 @@ public class JobHandlingTest extends Abs
             cb.reset();
             assertFalse("Unexpected event received in the given time.", cb.block(5));
             assertEquals("Unexpected number of retries", 3, retryCountList.size());
+
+            jobManager.removeJobById(job.getId());
         } finally {
             jcReg.unregister();
         }
@@ -417,8 +436,8 @@ public class JobHandlingTest extends Abs
                     }
                 });
 
+        final JobManager jobManager = this.getJobManager();
         try {
-            final JobManager jobManager = this.getJobManager();
             jobManager.addJob(TOPIC, "1", null);
             jobManager.addJob(TOPIC, "2", null);
             jobManager.addJob(TOPIC, "3", null);
@@ -439,6 +458,10 @@ public class JobHandlingTest extends Abs
             assertEquals("Started count", 10, started.size());
             assertEquals("Failed count", 5, failed.size());
         } finally {
+            final Collection<Job> col = jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1, (Map<String, Object>[])null);
+            for(final Job j : col) {
+                jobManager.removeJobById(j.getId());
+            }
             jcReg.unregister();
             eh1Reg.unregister();
             eh2Reg.unregister();