You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/09/30 18:03:13 UTC

svn commit: r1527622 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/queues/ main/java/org/apache/sling/event/impl/support/ main/java/org/apache/sling/event/jobs/ te...

Author: cziegeler
Date: Mon Sep 30 16:03:13 2013
New Revision: 1527622

URL: http://svn.apache.org/r1527622
Log:
SLING-3028 :  Support for progress tracking of jobs 

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Mon Sep 30 16:03:13 2013
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.event.impl.jobs;
 
+
+import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.consumer.JobState;
 
 
@@ -42,18 +44,17 @@ public class JobHandler {
         return this.job;
     }
 
-    public boolean start() {
-        this.job.prepare();
-        return this.jobManager.start(this);
+    public boolean startProcessing(final Queue queue) {
+        return this.jobManager.persistJobProperties(this.job, this.job.prepare(queue));
     }
 
     /**
      * Finish the processing of the job
      * @param state The state of processing
      */
-    public void finished(final JobState state, final boolean keepJobInHistory) {
+    public void finished(final JobState state, final boolean keepJobInHistory, final long duration) {
         // for now we just keep cancelled jobs
-        this.jobManager.finishJob(this.job, state, keepJobInHistory);
+        this.jobManager.finishJob(this.job, state, keepJobInHistory, duration);
     }
 
     /**
@@ -65,7 +66,7 @@ public class JobHandler {
     }
 
     public void cancel() {
-        this.jobManager.finishJob(this.job, JobState.CANCELLED, true);
+        this.jobManager.finishJob(this.job, JobState.CANCELLED, true, -1);
     }
 
     public void reassign() {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Mon Sep 30 16:03:13 2013
@@ -20,6 +20,7 @@ package org.apache.sling.event.impl.jobs
 
 import java.text.MessageFormat;
 import java.util.Calendar;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -237,13 +238,18 @@ public class JobImpl implements Job {
     /**
      * Prepare a new job execution
      */
-    public void prepare() {
+    public String[] prepare(final Queue queue) {
+        this.updateQueueInfo(queue);
         this.properties.remove(JobImpl.PROPERTY_DELAY_OVERRIDE);
-        this.properties.remove(Job.PROPERTY_JOB_LOG);
+        this.properties.remove(Job.PROPERTY_JOB_PROGRESS_LOG);
         this.properties.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
         this.properties.remove(Job.PROPERTY_JOB_PROGRESS_STEPS);
         this.properties.remove(Job.PROPERTY_JOB_PROGRESS_STEP);
         this.properties.remove(Job.PROPERTY_RESULT_MESSAGE);
+        this.properties.put(Job.PROPERTY_JOB_STARTED_TIME, Calendar.getInstance());
+        return new String[] {Job.PROPERTY_JOB_QUEUE_NAME, Job.PROPERTY_JOB_RETRIES, Job.PROPERTY_JOB_PRIORITY,
+                Job.PROPERTY_JOB_PROGRESS_LOG, Job.PROPERTY_JOB_PROGRESS_ETA, PROPERTY_JOB_PROGRESS_STEPS,
+                PROPERTY_JOB_PROGRESS_STEP, Job.PROPERTY_RESULT_MESSAGE, Job.PROPERTY_JOB_STARTED_TIME};
     }
 
     public String[] startProgress(final int steps, final long eta) {
@@ -251,7 +257,10 @@ public class JobImpl implements Job {
             this.setProperty(Job.PROPERTY_JOB_PROGRESS_STEPS, steps);
         }
         if ( eta > 0 ) {
-            this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+            final Date finishDate = new Date(System.currentTimeMillis() + eta * 1000);
+            final Calendar finishCal = Calendar.getInstance();
+            finishCal.setTime(finishDate);
+            this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, finishCal);
         }
         return new String[] {Job.PROPERTY_JOB_PROGRESS_ETA, PROPERTY_JOB_PROGRESS_STEPS};
     }
@@ -273,22 +282,29 @@ public class JobImpl implements Job {
     }
 
     public String update(final long eta) {
-        this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+        if ( eta > 0 ) {
+            final Date finishDate = new Date(System.currentTimeMillis() + eta * 1000);
+            final Calendar finishCal = Calendar.getInstance();
+            finishCal.setTime(finishDate);
+            this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+        } else {
+            this.properties.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
+        }
         return Job.PROPERTY_JOB_PROGRESS_ETA;
     }
 
     public String log(final String message, final Object... args) {
         final String logEntry = MessageFormat.format(message, args);
-        final String[] entries = this.getProperty(Job.PROPERTY_JOB_LOG, String[].class);
+        final String[] entries = this.getProperty(Job.PROPERTY_JOB_PROGRESS_LOG, String[].class);
         if ( entries == null ) {
-            this.setProperty(Job.PROPERTY_JOB_LOG, new String[] {logEntry});
+            this.setProperty(Job.PROPERTY_JOB_PROGRESS_LOG, new String[] {logEntry});
         } else {
             final String[] newEntries = new String[entries.length + 1];
             System.arraycopy(entries, 0, newEntries, 0, entries.length);
             newEntries[entries.length] = logEntry;
-            this.setProperty(Job.PROPERTY_JOB_LOG, newEntries);
+            this.setProperty(Job.PROPERTY_JOB_PROGRESS_LOG, newEntries);
         }
-        return Job.PROPERTY_JOB_LOG;
+        return Job.PROPERTY_JOB_PROGRESS_LOG;
     }
 
     @Override

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Sep 30 16:03:13 2013
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -297,7 +298,7 @@ public class JobManagerImpl
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
             }
-            this.finishJob(job, JobState.CANCELLED, false);
+            this.finishJob(job, JobState.CANCELLED, false, -1);
         } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
             if ( !reassign ) {
                 if ( logger.isDebugEnabled() ) {
@@ -333,7 +334,7 @@ public class JobManagerImpl
                         if ( queue == null ) {
                             // this is just a sanity check, actually we can never get here
                             logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
-                            this.finishJob(job, JobState.CANCELLED, false);
+                            this.finishJob(job, JobState.CANCELLED, false, -1);
                         } else {
                             queues.put(queueInfo.queueName, queue);
                             ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
@@ -775,7 +776,7 @@ public class JobManagerImpl
                         }
                     }
                 } else {
-                    this.finishJob(job, JobState.CANCELLED, true);
+                    this.finishJob(job, JobState.CANCELLED, true, -1);
                 }
             }
         } else {
@@ -1058,7 +1059,10 @@ public class JobManagerImpl
      * @param info  The job handler
      * @param state The state of the processing
      */
-    public void finishJob(final JobImpl job, final JobState state, final boolean keepJobInHistory) {
+    public void finishJob(final JobImpl job,
+                          final JobState state,
+                          final boolean keepJobInHistory,
+                          final long duration) {
         final boolean isSuccess = (state == JobState.SUCCEEDED);
         ResourceResolver resolver = null;
         try {
@@ -1072,7 +1076,17 @@ public class JobManagerImpl
                         newPath = this.configuration.getStoragePath(job, isSuccess);
                         final Map<String, Object> props = new HashMap<String, Object>(vm);
                         props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ? JobState.SUCCEEDED.name() : JobState.CANCELLED.name());
-                        props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
+                        if ( isSuccess ) {
+                            // we set the finish date to start date + duration
+                            final Date finishDate = new Date();
+                            finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
+                            final Calendar finishCal = Calendar.getInstance();
+                            finishCal.setTime(finishDate);
+                            props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
+                        } else {
+                            // current time is good enough
+                            props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
+                        }
                         if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
                             props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
                         }
@@ -1142,42 +1156,6 @@ public class JobManagerImpl
     }
 
     /**
-     * Try to start the job
-     */
-    public boolean start(final JobHandler info) {
-        ResourceResolver resolver = null;
-        try {
-            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-            final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
-            if ( jobResource != null ) {
-                final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                mvm.put(Job.PROPERTY_JOB_STARTED_TIME, Calendar.getInstance());
-                mvm.put(Job.PROPERTY_JOB_QUEUE_NAME, info.getJob().getQueueName());
-                mvm.put(Job.PROPERTY_JOB_RETRIES, info.getJob().getNumberOfRetries());
-                mvm.put(Job.PROPERTY_JOB_PRIORITY, info.getJob().getJobPriority().name());
-                mvm.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
-                mvm.remove(Job.PROPERTY_JOB_PROGRESS_STEPS);
-                mvm.remove(Job.PROPERTY_JOB_PROGRESS_STEP);
-                mvm.remove(Job.PROPERTY_JOB_LOG);
-                mvm.remove(Job.PROPERTY_RESULT_MESSAGE);
-                resolver.commit();
-
-                return true;
-            }
-        } catch ( final PersistenceException ignore ) {
-            this.ignoreException(ignore);
-        } catch ( final LoginException ignore ) {
-            this.ignoreException(ignore);
-        } finally {
-            if ( resolver != null ) {
-                resolver.close();
-            }
-        }
-
-        return false;
-    }
-
-    /**
      * Try to get a "lock" for a resource
      */
     private boolean lock(final String id) {
@@ -1376,7 +1354,7 @@ public class JobManagerImpl
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
             }
-            this.finishJob(job, JobState.CANCELLED, false); // DROP means complete removal
+            this.finishJob(job, JobState.CANCELLED, false, -1); // DROP means complete removal
         } else {
             String targetId = null;
             if ( config.getType() != QueueConfiguration.Type.IGNORE ) {
@@ -1397,7 +1375,7 @@ public class JobManagerImpl
     /**
      * Update the property of a job in the resource tree
      */
-    public void persistJobProperties(final JobImpl job, final String... propNames) {
+    public boolean persistJobProperties(final JobImpl job, final String... propNames) {
         ResourceResolver resolver = null;
         try {
             resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
@@ -1407,10 +1385,18 @@ public class JobManagerImpl
                 for(final String propName : propNames) {
                     final Object val = job.getProperty(propName);
                     if ( val != null ) {
-                        mvm.put(propName, job.getProperty(propName));
+                        if ( val.getClass().isEnum() ) {
+                            mvm.put(propName, val.toString());
+                        } else {
+                            mvm.put(propName, val);
+                        }
+                    } else {
+                        mvm.remove(propName);
                     }
                 }
                 resolver.commit();
+
+                return true;
             }
         } catch ( final PersistenceException ignore ) {
             this.ignoreException(ignore);
@@ -1421,6 +1407,7 @@ public class JobManagerImpl
                 resolver.close();
             }
         }
+        return false;
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Sep 30 16:03:13 2013
@@ -354,6 +354,9 @@ public abstract class AbstractJobQueue
         return this.finishedJob(location, shouldReschedule ? JobStatus.FAILED : JobStatus.SUCCEEDED, false);
     }
 
+    /**
+     * Handle job finish and determine whether to reschedule or cancel the job
+     */
     private boolean finishedJob(final String jobId,
                                 final JobStatus result,
                                 final boolean isAsync) {
@@ -394,7 +397,7 @@ public abstract class AbstractJobQueue
         if ( !rescheduleInfo.reschedule ) {
             // we keep cancelled jobs and succeeded jobs if the queue is configured like this.
             final boolean keepJobs = result.getState() != JobState.SUCCEEDED || this.configuration.isKeepJobs();
-            handler.finished(result.getState(), keepJobs);
+            handler.finished(result.getState(), keepJobs, rescheduleInfo.processingTime);
             finishSuccessful = true;
             if ( result.getState() == JobState.SUCCEEDED ) {
                 Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
@@ -497,7 +500,7 @@ public abstract class AbstractJobQueue
         final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic());
 
         if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
-            if ( handler.start() ) {
+            if ( handler.startProcessing(this) ) {
                 if ( logger.isDebugEnabled() ) {
                     logger.debug("Starting job {}", Utility.toString(job));
                 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Mon Sep 30 16:03:13 2013
@@ -67,7 +67,7 @@ public abstract class ResourceHelper {
         JobStatusNotifier.CONTEXT_PROPERTY_NAME,
         JobImpl.PROPERTY_DELAY_OVERRIDE,
         JobConsumer.PROPERTY_JOB_ASYNC_HANDLER,
-        Job.PROPERTY_JOB_LOG,
+        Job.PROPERTY_JOB_PROGRESS_LOG,
         Job.PROPERTY_JOB_PROGRESS_ETA,
         Job.PROPERTY_JOB_PROGRESS_STEP,
         Job.PROPERTY_JOB_PROGRESS_STEPS,

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java Mon Sep 30 16:03:13 2013
@@ -119,13 +119,15 @@ public interface Job {
      * This property contains the optional output log of a job consumer.
      * The value of this property is a string array.
      * This property is read-only and can't be specified when the job is created.
+     * @since 1.3
      */
-    String PROPERTY_JOB_LOG = "slingevent:log";
+    String PROPERTY_JOB_PROGRESS_LOG = "slingevent:progressLog";
 
     /**
      * This property contains the optional ETA for a job.
      * The value of this property is a {@link Calendar} object.
      * This property is read-only and can't be specified when the job is created.
+     * @since 1.3
      */
     String PROPERTY_JOB_PROGRESS_ETA = "slingevent:progressETA";
 
@@ -135,6 +137,7 @@ public interface Job {
      * assumed to consume roughly the same amount if time.
      * The value of this property is an integer.
      * This property is read-only and can't be specified when the job is created.
+     * @since 1.3
      */
     String PROPERTY_JOB_PROGRESS_STEPS = "slingevent:progressSteps";
 
@@ -143,6 +146,7 @@ public interface Job {
      * the number of completed steps.
      * The value of this property is an integer.
      * This property is read-only and can't be specified when the job is created.
+     * @since 1.3
      */
     String PROPERTY_JOB_PROGRESS_STEP = "slingevent:progressStep";
 
@@ -150,6 +154,7 @@ public interface Job {
      * This property contains the optional result message of a job consumer.
      * The value of this property is a string.
      * This property is read-only and can't be specified when the job is created.
+     * @since 1.3
      */
     String PROPERTY_RESULT_MESSAGE = "slingevent:resultMessage";
 
@@ -157,6 +162,7 @@ public interface Job {
      * This property contains the finished state of a job once it's marked as finished.
      * TODO - DOCUMENT
      * This property is read-only and can't be specified when the job is created.
+     * @since 1.3
      */
     String PROPERTY_FINISHED_STATE = "slingevent:finishedState";
 
@@ -164,6 +170,7 @@ public interface Job {
      * This property contains the finished date once a job is marked as finished.
      * The value of this property is a {@link Calendar} object.
      * This property is read-only and can't be specified when the job is created.
+     * @since 1.3
      */
     String PROPERTY_FINISHED_DATE = "slingevent:finishedDate";
 

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java Mon Sep 30 16:03:13 2013
@@ -35,6 +35,7 @@ import javax.inject.Inject;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
 import org.apache.sling.launchpad.api.StartupHandler;
 import org.apache.sling.launchpad.api.StartupMode;
 import org.ops4j.pax.exam.Configuration;
@@ -213,6 +214,18 @@ public abstract class AbstractJobHandlin
     }
 
     /**
+     * Helper method to register a job executor
+     */
+    protected ServiceRegistration registerJobExecutor(final String topic,
+            final JobExecutor handler) {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(JobConsumer.PROPERTY_TOPICS, topic);
+        final ServiceRegistration reg = this.bc.registerService(JobExecutor.class.getName(),
+                handler, props);
+        return reg;
+    }
+
+    /**
      * Helper method to remove a configuration
      */
     protected void removeConfiguration(final String pid) throws IOException {

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java Mon Sep 30 16:03:13 2013
@@ -32,7 +32,9 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.QueueConfiguration;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.event.jobs.consumer.JobStatus;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -90,17 +92,17 @@ public class HistoryTest extends Abstrac
      */
     @Test(timeout = DEFAULT_TEST_TIMEOUT)
     public void testHistory() throws Exception {
-        final ServiceRegistration reg = this.registerJobConsumer(TOPIC,
-                new JobConsumer() {
+        final ServiceRegistration reg = this.registerJobExecutor(TOPIC,
+                new JobExecutor() {
 
                     @Override
-                    public JobResult process(final Job job) {
+                    public JobStatus process(Job job, JobExecutionContext context) {
                         sleep(5L);
                         final long count = job.getProperty(PROP_COUNTER, Long.class);
                         if ( count == 2 || count == 5 || count == 7 ) {
-                            return JobResult.CANCEL;
+                            return JobStatus.CANCELLED;
                         }
-                        return JobResult.OK;
+                        return JobStatus.SUCCEEDED;
                     }
 
                 });