You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/16 17:24:53 UTC

svn commit: r1532793 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/queues/ main/java/org/apache/sling/event/jobs/ main/java/org/apache/sling/event/jobs/consumer/ t...

Author: cziegeler
Date: Wed Oct 16 15:24:52 2013
New Revision: 1532793

URL: http://svn.apache.org/r1532793
Log:
SLING-3169 : Job state and related enumerations

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Wed Oct 16 15:24:52 2013
@@ -19,6 +19,7 @@
 package org.apache.sling.event.impl.jobs;
 
 
+import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.Queue;
 
 
@@ -54,7 +55,7 @@ public class JobHandler {
      * Finish the processing of the job
      * @param state The state of processing
      */
-    public void finished(final InternalJobState state, final boolean keepJobInHistory, final long duration) {
+    public void finished(final Job.JobState state, final boolean keepJobInHistory, final long duration) {
         // for now we just keep cancelled jobs
         this.jobManager.finishJob(this.job, state, keepJobInHistory, duration);
     }
@@ -68,7 +69,7 @@ public class JobHandler {
     }
 
     public void cancel() {
-        this.jobManager.finishJob(this.job, InternalJobState.CANCELLED, true, -1);
+        this.jobManager.finishJob(this.job, Job.JobState.DROPPED, true, -1);
     }
 
     public void reassign() {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Oct 16 15:24:52 2013
@@ -307,7 +307,7 @@ public class JobManagerImpl
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
             }
-            this.finishJob(job, InternalJobState.CANCELLED, false, -1);
+            this.finishJob(job, Job.JobState.DROPPED, false, -1);
         } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
             if ( !reassign ) {
                 if ( logger.isDebugEnabled() ) {
@@ -343,7 +343,7 @@ public class JobManagerImpl
                         if ( queue == null ) {
                             // this is just a sanity check, actually we can never get here
                             logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
-                            this.finishJob(job, InternalJobState.CANCELLED, false, -1);
+                            this.finishJob(job, Job.JobState.DROPPED, false, -1);
                         } else {
                             queues.put(queueInfo.queueName, queue);
                             ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
@@ -801,7 +801,7 @@ public class JobManagerImpl
                         }
                     }
                 } else {
-                    this.finishJob(job, InternalJobState.CANCELLED, true, -1);
+                    this.finishJob(job, Job.JobState.DROPPED, true, -1);
                 }
             }
         } else {
@@ -931,6 +931,7 @@ public class JobManagerImpl
      */
     @Override
     public Job getJob(final String topic, final Map<String, Object> template) {
+        @SuppressWarnings("unchecked")
         final Iterable<Job> iter = this.findJobs(QueryType.ALL, topic, 1, template);
         final Iterator<Job> i = iter.iterator();
         if ( i.hasNext() ) {
@@ -955,7 +956,13 @@ public class JobManagerImpl
             final String topic,
             final long limit,
             final Map<String, Object>... templates) {
-        final boolean isHistoryQuery = type == QueryType.HISTORY || type == QueryType.SUCCEEDED || type == QueryType.CANCELLED;
+        final boolean isHistoryQuery = type == QueryType.HISTORY
+                                       || type == QueryType.SUCCEEDED
+                                       || type == QueryType.CANCELLED
+                                       || type == QueryType.DROPPED
+                                       || type == QueryType.ERROR
+                                       || type == QueryType.GIVEN_UP
+                                       || type == QueryType.STOPPED;
         final List<Job> result = new ArrayList<Job>();
         ResourceResolver resolver = null;
         try {
@@ -975,14 +982,28 @@ public class JobManagerImpl
             if ( isHistoryQuery ) {
                 buf.append(" and @");
                 buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
-                if ( type == QueryType.SUCCEEDED ) {
+                if ( type == QueryType.SUCCEEDED || type == QueryType.DROPPED || type == QueryType.ERROR || type == QueryType.GIVEN_UP || type == QueryType.STOPPED ) {
                     buf.append(" = '");
-                    buf.append(InternalJobState.SUCCEEDED.name());
+                    buf.append(type.name());
                     buf.append("'");
                 } else if ( type == QueryType.CANCELLED ) {
+                    buf.append(" and (@");
+                    buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
                     buf.append(" = '");
-                    buf.append(InternalJobState.CANCELLED.name());
-                    buf.append("'");
+                    buf.append(QueryType.DROPPED.name());
+                    buf.append("' or @");
+                    buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
+                    buf.append(" = '");
+                    buf.append(QueryType.ERROR.name());
+                    buf.append("' or @");
+                    buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
+                    buf.append(" = '");
+                    buf.append(QueryType.GIVEN_UP.name());
+                    buf.append("' or @");
+                    buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
+                    buf.append(" = '");
+                    buf.append(QueryType.STOPPED.name());
+                    buf.append("')");
                 }
             } else {
                 buf.append(" and not(@");
@@ -1067,10 +1088,10 @@ public class JobManagerImpl
      * @param state The state of the processing
      */
     public void finishJob(final JobImpl job,
-                          final InternalJobState state,
+                          final Job.JobState state,
                           final boolean keepJobInHistory,
                           final long duration) {
-        final boolean isSuccess = (state == InternalJobState.SUCCEEDED);
+        final boolean isSuccess = (state == Job.JobState.SUCCEEDED);
         ResourceResolver resolver = null;
         try {
             resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
@@ -1082,7 +1103,7 @@ public class JobManagerImpl
                         final ValueMap vm = ResourceHelper.getValueMap(jobResource);
                         newPath = this.configuration.getStoragePath(job, isSuccess);
                         final Map<String, Object> props = new HashMap<String, Object>(vm);
-                        props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ? InternalJobState.SUCCEEDED.name() : InternalJobState.CANCELLED.name());
+                        props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
                         if ( isSuccess ) {
                             // we set the finish date to start date + duration
                             final Date finishDate = new Date();
@@ -1365,7 +1386,7 @@ public class JobManagerImpl
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
             }
-            this.finishJob(job, InternalJobState.CANCELLED, false, -1); // DROP means complete removal
+            this.finishJob(job, Job.JobState.DROPPED, false, -1); // DROP means complete removal
         } else {
             String targetId = null;
             if ( config.getType() != QueueConfiguration.Type.IGNORE ) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Oct 16 15:24:52 2013
@@ -305,9 +305,9 @@ public abstract class AbstractJobQueue
         public long    processingTime;
     }
 
-    private RescheduleInfo handleReschedule(final JobHandler jobEvent, final JobExecutionResultImpl result) {
+    private RescheduleInfo handleReschedule(final JobHandler jobEvent, final Job.JobState resultState) {
         final RescheduleInfo info = new RescheduleInfo();
-        switch ( result.getState() ) {
+        switch ( resultState ) {
             case SUCCEEDED : // job is finished
                 if ( this.logger.isDebugEnabled() ) {
                     this.logger.debug("Finished job {}", Utility.toString(jobEvent.getJob()));
@@ -315,7 +315,7 @@ public abstract class AbstractJobQueue
                 info.processingTime = System.currentTimeMillis() - jobEvent.started;
                 this.finishedJob(info.processingTime);
                 break;
-            case FAILED : // check if we exceeded the number of retries
+            case QUEUED : // check if we exceeded the number of retries
                 int retries = (Integer) jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
                 int retryCount = (Integer)jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
 
@@ -336,7 +336,7 @@ public abstract class AbstractJobQueue
                     jobEvent.queued = System.currentTimeMillis();
                 }
                 break;
-            case CANCELLED : // consumer cancelled the job
+            default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
                 if ( this.logger.isDebugEnabled() ) {
                     this.logger.debug("Cancelled job {}", Utility.toString(jobEvent.getJob()));
                 }
@@ -353,17 +353,17 @@ public abstract class AbstractJobQueue
     @Override
     public boolean finishedJob(final Event job, final boolean shouldReschedule) {
         final String location = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID);
-        return this.finishedJob(location, shouldReschedule ? JobExecutionResultImpl.FAILED : JobExecutionResultImpl.SUCCEEDED, false);
+        return this.finishedJob(location, shouldReschedule ? Job.JobState.QUEUED : Job.JobState.SUCCEEDED, false);
     }
 
     /**
      * Handle job finish and determine whether to reschedule or cancel the job
      */
     private boolean finishedJob(final String jobId,
-                                final JobExecutionResultImpl result,
+                                Job.JobState resultState,
                                 final boolean isAsync) {
         if ( this.logger.isDebugEnabled() ) {
-            this.logger.debug("Received finish for job {}, result={}", jobId, result);
+            this.logger.debug("Received finish for job {}, resultState={}", jobId, resultState);
         }
         // let's remove the event from our processing list
         // this is just a sanity check, as usually the job should have been
@@ -391,17 +391,19 @@ public abstract class AbstractJobQueue
         }
 
         // handle the reschedule, a new job might be returned with updated reschedule info!
-        final RescheduleInfo rescheduleInfo = this.handleReschedule(handler, result);
-
+        final RescheduleInfo rescheduleInfo = this.handleReschedule(handler, resultState);
+        if ( resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule ) {
+            resultState = Job.JobState.GIVEN_UP;
+        }
         // if this is set after the synchronized block we have an error
         final boolean finishSuccessful;
 
         if ( !rescheduleInfo.reschedule ) {
             // we keep cancelled jobs and succeeded jobs if the queue is configured like this.
-            final boolean keepJobs = result.getState() != InternalJobState.SUCCEEDED || this.configuration.isKeepJobs();
-            handler.finished(result.getState(), keepJobs, rescheduleInfo.processingTime);
+            final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
+            handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
             finishSuccessful = true;
-            if ( result.getState() == InternalJobState.SUCCEEDED ) {
+            if ( resultState == Job.JobState.SUCCEEDED ) {
                 Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
             } else {
                 Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
@@ -542,6 +544,7 @@ public abstract class AbstractJobQueue
                                     }
                                 }
                                 JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
+                                Job.JobState resultState = Job.JobState.ERROR;
                                 final AtomicBoolean isAsync = new AtomicBoolean(false);
 
                                 try {
@@ -584,11 +587,23 @@ public abstract class AbstractJobQueue
                                             }
 
                                             @Override
-                                            public void asyncProcessingFinished(final JobExecutionResult status) {
+                                            public void asyncProcessingFinished(final JobExecutionResult result) {
                                                 synchronized ( lock ) {
                                                     if ( isAsync.compareAndSet(true, false) ) {
                                                         jobConsumerManager.unregisterListener(job.getId());
-                                                        finishedJob(job.getId(), (JobExecutionResultImpl)status, true);
+                                                        Job.JobState state = null;
+                                                        if ( result.succeeded() ) {
+                                                            state = Job.JobState.SUCCEEDED;
+                                                        } else if ( result.failed() ) {
+                                                            state = Job.JobState.QUEUED;
+                                                        } else if ( result.cancelled() ) {
+                                                            if ( handler.isStopped() ) {
+                                                                state = Job.JobState.STOPPED;
+                                                            } else {
+                                                                state = Job.JobState.ERROR;
+                                                            }
+                                                        }
+                                                        finishedJob(job.getId(), state, true);
                                                         asyncCounter.decrementAndGet();
                                                     } else {
                                                         throw new IllegalStateException("Job is not processed async " + job.getId());
@@ -639,12 +654,25 @@ public abstract class AbstractJobQueue
                                             asyncCounter.incrementAndGet();
                                             notifyFinished(null);
                                             isAsync.set(true);
+                                        } else {
+                                            if ( result.succeeded() ) {
+                                                resultState = Job.JobState.SUCCEEDED;
+                                            } else if ( result.failed() ) {
+                                                resultState = Job.JobState.QUEUED;
+                                            } else if ( result.cancelled() ) {
+                                                if ( handler.isStopped() ) {
+                                                    resultState = Job.JobState.STOPPED;
+                                                } else {
+                                                    resultState = Job.JobState.ERROR;
+                                                }
+                                            }
                                         }
                                     }
                                 } catch (final Throwable t) { //NOSONAR
                                     logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
                                     // we don't reschedule if an exception occurs
                                     result = JobExecutionResultImpl.CANCELLED;
+                                    resultState = Job.JobState.ERROR;
                                 } finally {
                                     currentThread.setPriority(oldPriority);
                                     currentThread.setName(oldName);
@@ -655,7 +683,7 @@ public abstract class AbstractJobQueue
                                         if ( result.getMessage() != null ) {
                                            job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
                                         }
-                                        finishedJob(job.getId(), result, false);
+                                        finishedJob(job.getId(), resultState, false);
                                     }
                                 }
                             }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java Wed Oct 16 15:24:52 2013
@@ -182,7 +182,10 @@ public interface Job {
         QUEUED,     // waiting in queue after adding or for restart after failing
         ACTIVE,     // job is currently in processing
         SUCCEEDED,  // processing finished successfully
-        CANCELLED,  // processing failed permanently
+        STOPPED,    // processing was stopped by a user
+        GIVEN_UP,   // number of retries reached
+        ERROR,      // processing signaled CANCELLED or throw an exception
+        DROPPED     // dropped jobs
     };
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java Wed Oct 16 15:24:52 2013
@@ -64,9 +64,13 @@ public interface JobManager {
         ALL,      // all means all active and all queued
         ACTIVE,
         QUEUED,
-        HISTORY,  // returns the complete history of cancelled and succeeded jobs (if available)
-        CANCELLED,// history of cancelled jobs
-        SUCCEEDED // history of succeeded jobs
+        HISTORY,    // returns the complete history of cancelled and succeeded jobs (if available)
+        CANCELLED,  // history of cancelled jobs (STOPPED, GIVEN_UP, ERROR, DROPPED)
+        SUCCEEDED,  // history of succeeded jobs
+        STOPPED,    // history of stopped jobs
+        GIVEN_UP,   // history of given up jobs
+        ERROR,      // history of jobs signaled CANCELLED or throw an exception
+        DROPPED     // history of dropped jobs
     }
 
     /**
@@ -167,7 +171,10 @@ public interface JobManager {
     Collection<Job> findJobs(QueryType type, String topic, long limit, Map<String, Object>... templates);
 
     /**
-     * Stop a job
+     * Stop a job.
+     * When a job is stopped and the job consumer supports stopping the job processing, it is up
+     * to the job consumer how the stopping is handled. The job can be marked as finished successful,
+     * permanently failed or being retried.
      * @since 1.3
      */
     void stopJobById(String jobId);
@@ -181,6 +188,7 @@ public interface JobManager {
     JobBuilder createJob(final String topic);
 
     /**
+     * Return all available job schedules.
      * @since 1.3
      */
     Collection<ScheduledJobInfo> getScheduledJobs();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java Wed Oct 16 15:24:52 2013
@@ -39,9 +39,9 @@ public interface JobExecutionContext {
      * and stop processing if the method return <code>true</code>.
      * If a job is stopped and the job executor detects this, its up
      * to the implementation to decide the result of such a state.
-     * There might be use cases where the job returns {@link JobStatus#SUCCEEDED}
-     * although it didn't process everything, or {@link JobStatus#FAILED}
-     * to retry later on or {@link JobStatus#CANCELLED}.
+     * There might be use cases where the job returns {@link JobExecutionResult#succeeded()}
+     * although it didn't process everything, or {@link JobExecutionResult#failed()}
+     * to retry later on or {@link JobExecutionResult#cancelled()}.
      * @return Whether this job has been stopped from the outside.
      */
     boolean isStopped();

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java Wed Oct 16 15:24:52 2013
@@ -36,7 +36,6 @@ import org.apache.sling.event.EventPrope
 import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
@@ -127,7 +126,7 @@ public class ClassloadingTest extends Ab
             props.put("list", list);
             props.put("map", map);
 
-            jobManager.addJob(TOPIC, null, props);
+            final String jobId = jobManager.addJob(TOPIC, props).getId();
 
             new RetryLoop(Conditions.collectionIsNotEmptyCondition(finishedEvents,
                     "Waiting for finishedEvents to have at least one element"), 5, 50);
@@ -156,6 +155,8 @@ public class ClassloadingTest extends Ab
             assertEquals(new Long(7), Long.valueOf(finishedEvents.get(0).getProperty("long").toString()));
             assertEquals(list, finishedEvents.get(0).getProperty("list"));
             assertEquals(map, finishedEvents.get(0).getProperty("map"));
+
+            jobManager.removeJobById(jobId);
         } finally {
             jcReg.unregister();
             ehReg.unregister();
@@ -171,11 +172,11 @@ public class ClassloadingTest extends Ab
 
                     @Override
                     public JobResult process(Job job) {
-                failedJobsCount.incrementAndGet();
+                        failedJobsCount.incrementAndGet();
                         return JobResult.OK;
                     }
                 });
-        final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+        final ServiceRegistration ehReg = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -194,7 +195,7 @@ public class ClassloadingTest extends Ab
             final Map<String, Object> props = new HashMap<String, Object>();
             props.put("dao", dao);
 
-            final String id = jobManager.addJob(TOPIC + "/failed", null, props).getId();
+            final String id = jobManager.addJob(TOPIC + "/failed", props).getId();
 
             // wait until the conditions are met
             new RetryLoop(new RetryLoop.Condition() {
@@ -215,8 +216,10 @@ public class ClassloadingTest extends Ab
                 }
             }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
 
-            jobManager.removeJobById(id);
+            jobManager.removeJobById(id); // moves the job to the history section
             assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1, (Map<String, Object>[])null).size());
+
+            jobManager.removeJobById(id); // removes the job permanently
         } finally {
             jcReg.unregister();
             ehReg.unregister();

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1532793&r1=1532792&r2=1532793&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java Wed Oct 16 15:24:52 2013
@@ -101,7 +101,7 @@ public class HistoryTest extends Abstrac
                         sleep(5L);
                         final long count = job.getProperty(PROP_COUNTER, Long.class);
                         if ( count == 2 || count == 5 || count == 7 ) {
-                            return context.result().message(Job.JobState.CANCELLED.name()).cancelled();
+                            return context.result().message(Job.JobState.ERROR.name()).cancelled();
                         }
                         return context.result().message(Job.JobState.SUCCEEDED.name()).succeeded();
                     }
@@ -118,7 +118,14 @@ public class HistoryTest extends Abstrac
             }
             col = this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1, (Map<String, Object>[])null);
             assertEquals(10, col.size());
+            assertEquals(0, this.getJobManager().findJobs(JobManager.QueryType.ACTIVE, TOPIC, -1, (Map<String, Object>[])null).size());
+            assertEquals(0, this.getJobManager().findJobs(JobManager.QueryType.QUEUED, TOPIC, -1, (Map<String, Object>[])null).size());
+            assertEquals(0, this.getJobManager().findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String, Object>[])null).size());
             assertEquals(3, this.getJobManager().findJobs(JobManager.QueryType.CANCELLED, TOPIC, -1, (Map<String, Object>[])null).size());
+            assertEquals(0, this.getJobManager().findJobs(JobManager.QueryType.DROPPED, TOPIC, -1, (Map<String, Object>[])null).size());
+            assertEquals(3, this.getJobManager().findJobs(JobManager.QueryType.ERROR, TOPIC, -1, (Map<String, Object>[])null).size());
+            assertEquals(0, this.getJobManager().findJobs(JobManager.QueryType.GIVEN_UP, TOPIC, -1, (Map<String, Object>[])null).size());
+            assertEquals(0, this.getJobManager().findJobs(JobManager.QueryType.STOPPED, TOPIC, -1, (Map<String, Object>[])null).size());
             assertEquals(7, this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, TOPIC, -1, (Map<String, Object>[])null).size());
             // verify order, message and state
             long last = 9;
@@ -127,7 +134,7 @@ public class HistoryTest extends Abstrac
                 final long count = j.getProperty(PROP_COUNTER, Long.class);
                 assertEquals(last, count);
                 if ( count == 2 || count == 5 || count == 7 ) {
-                    assertEquals(Job.JobState.CANCELLED, j.getJobState());
+                    assertEquals(Job.JobState.ERROR, j.getJobState());
                 } else {
                     assertEquals(Job.JobState.SUCCEEDED, j.getJobState());
                 }