You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/09/30 18:03:13 UTC
svn commit: r1527622 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/queues/
main/java/org/apache/sling/event/impl/support/
main/java/org/apache/sling/event/jobs/ te...
Author: cziegeler
Date: Mon Sep 30 16:03:13 2013
New Revision: 1527622
URL: http://svn.apache.org/r1527622
Log:
SLING-3028 : Support for progress tracking of jobs
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Mon Sep 30 16:03:13 2013
@@ -18,6 +18,8 @@
*/
package org.apache.sling.event.impl.jobs;
+
+import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.consumer.JobState;
@@ -42,18 +44,17 @@ public class JobHandler {
return this.job;
}
- public boolean start() {
- this.job.prepare();
- return this.jobManager.start(this);
+ public boolean startProcessing(final Queue queue) {
+ return this.jobManager.persistJobProperties(this.job, this.job.prepare(queue));
}
/**
* Finish the processing of the job
* @param state The state of processing
*/
- public void finished(final JobState state, final boolean keepJobInHistory) {
+ public void finished(final JobState state, final boolean keepJobInHistory, final long duration) {
// for now we just keep cancelled jobs
- this.jobManager.finishJob(this.job, state, keepJobInHistory);
+ this.jobManager.finishJob(this.job, state, keepJobInHistory, duration);
}
/**
@@ -65,7 +66,7 @@ public class JobHandler {
}
public void cancel() {
- this.jobManager.finishJob(this.job, JobState.CANCELLED, true);
+ this.jobManager.finishJob(this.job, JobState.CANCELLED, true, -1);
}
public void reassign() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Mon Sep 30 16:03:13 2013
@@ -20,6 +20,7 @@ package org.apache.sling.event.impl.jobs
import java.text.MessageFormat;
import java.util.Calendar;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -237,13 +238,18 @@ public class JobImpl implements Job {
/**
* Prepare a new job execution
*/
- public void prepare() {
+ public String[] prepare(final Queue queue) {
+ this.updateQueueInfo(queue);
this.properties.remove(JobImpl.PROPERTY_DELAY_OVERRIDE);
- this.properties.remove(Job.PROPERTY_JOB_LOG);
+ this.properties.remove(Job.PROPERTY_JOB_PROGRESS_LOG);
this.properties.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
this.properties.remove(Job.PROPERTY_JOB_PROGRESS_STEPS);
this.properties.remove(Job.PROPERTY_JOB_PROGRESS_STEP);
this.properties.remove(Job.PROPERTY_RESULT_MESSAGE);
+ this.properties.put(Job.PROPERTY_JOB_STARTED_TIME, Calendar.getInstance());
+ return new String[] {Job.PROPERTY_JOB_QUEUE_NAME, Job.PROPERTY_JOB_RETRIES, Job.PROPERTY_JOB_PRIORITY,
+ Job.PROPERTY_JOB_PROGRESS_LOG, Job.PROPERTY_JOB_PROGRESS_ETA, PROPERTY_JOB_PROGRESS_STEPS,
+ PROPERTY_JOB_PROGRESS_STEP, Job.PROPERTY_RESULT_MESSAGE, Job.PROPERTY_JOB_STARTED_TIME};
}
public String[] startProgress(final int steps, final long eta) {
@@ -251,7 +257,10 @@ public class JobImpl implements Job {
this.setProperty(Job.PROPERTY_JOB_PROGRESS_STEPS, steps);
}
if ( eta > 0 ) {
- this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+ final Date finishDate = new Date(System.currentTimeMillis() + eta * 1000);
+ final Calendar finishCal = Calendar.getInstance();
+ finishCal.setTime(finishDate);
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, finishCal);
}
return new String[] {Job.PROPERTY_JOB_PROGRESS_ETA, PROPERTY_JOB_PROGRESS_STEPS};
}
@@ -273,22 +282,29 @@ public class JobImpl implements Job {
}
public String update(final long eta) {
- this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+ if ( eta > 0 ) {
+ final Date finishDate = new Date(System.currentTimeMillis() + eta * 1000);
+ final Calendar finishCal = Calendar.getInstance();
+ finishCal.setTime(finishDate);
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+ } else {
+ this.properties.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
+ }
return Job.PROPERTY_JOB_PROGRESS_ETA;
}
public String log(final String message, final Object... args) {
final String logEntry = MessageFormat.format(message, args);
- final String[] entries = this.getProperty(Job.PROPERTY_JOB_LOG, String[].class);
+ final String[] entries = this.getProperty(Job.PROPERTY_JOB_PROGRESS_LOG, String[].class);
if ( entries == null ) {
- this.setProperty(Job.PROPERTY_JOB_LOG, new String[] {logEntry});
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_LOG, new String[] {logEntry});
} else {
final String[] newEntries = new String[entries.length + 1];
System.arraycopy(entries, 0, newEntries, 0, entries.length);
newEntries[entries.length] = logEntry;
- this.setProperty(Job.PROPERTY_JOB_LOG, newEntries);
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_LOG, newEntries);
}
- return Job.PROPERTY_JOB_LOG;
+ return Job.PROPERTY_JOB_PROGRESS_LOG;
}
@Override
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Sep 30 16:03:13 2013
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -297,7 +298,7 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
}
- this.finishJob(job, JobState.CANCELLED, false);
+ this.finishJob(job, JobState.CANCELLED, false, -1);
} else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
if ( !reassign ) {
if ( logger.isDebugEnabled() ) {
@@ -333,7 +334,7 @@ public class JobManagerImpl
if ( queue == null ) {
// this is just a sanity check, actually we can never get here
logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
- this.finishJob(job, JobState.CANCELLED, false);
+ this.finishJob(job, JobState.CANCELLED, false, -1);
} else {
queues.put(queueInfo.queueName, queue);
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
@@ -775,7 +776,7 @@ public class JobManagerImpl
}
}
} else {
- this.finishJob(job, JobState.CANCELLED, true);
+ this.finishJob(job, JobState.CANCELLED, true, -1);
}
}
} else {
@@ -1058,7 +1059,10 @@ public class JobManagerImpl
* @param info The job handler
* @param state The state of the processing
*/
- public void finishJob(final JobImpl job, final JobState state, final boolean keepJobInHistory) {
+ public void finishJob(final JobImpl job,
+ final JobState state,
+ final boolean keepJobInHistory,
+ final long duration) {
final boolean isSuccess = (state == JobState.SUCCEEDED);
ResourceResolver resolver = null;
try {
@@ -1072,7 +1076,17 @@ public class JobManagerImpl
newPath = this.configuration.getStoragePath(job, isSuccess);
final Map<String, Object> props = new HashMap<String, Object>(vm);
props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ? JobState.SUCCEEDED.name() : JobState.CANCELLED.name());
- props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
+ if ( isSuccess ) {
+ // we set the finish date to start date + duration
+ final Date finishDate = new Date();
+ finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
+ final Calendar finishCal = Calendar.getInstance();
+ finishCal.setTime(finishDate);
+ props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
+ } else {
+ // current time is good enough
+ props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
+ }
if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
}
@@ -1142,42 +1156,6 @@ public class JobManagerImpl
}
/**
- * Try to start the job
- */
- public boolean start(final JobHandler info) {
- ResourceResolver resolver = null;
- try {
- resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
- if ( jobResource != null ) {
- final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- mvm.put(Job.PROPERTY_JOB_STARTED_TIME, Calendar.getInstance());
- mvm.put(Job.PROPERTY_JOB_QUEUE_NAME, info.getJob().getQueueName());
- mvm.put(Job.PROPERTY_JOB_RETRIES, info.getJob().getNumberOfRetries());
- mvm.put(Job.PROPERTY_JOB_PRIORITY, info.getJob().getJobPriority().name());
- mvm.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
- mvm.remove(Job.PROPERTY_JOB_PROGRESS_STEPS);
- mvm.remove(Job.PROPERTY_JOB_PROGRESS_STEP);
- mvm.remove(Job.PROPERTY_JOB_LOG);
- mvm.remove(Job.PROPERTY_RESULT_MESSAGE);
- resolver.commit();
-
- return true;
- }
- } catch ( final PersistenceException ignore ) {
- this.ignoreException(ignore);
- } catch ( final LoginException ignore ) {
- this.ignoreException(ignore);
- } finally {
- if ( resolver != null ) {
- resolver.close();
- }
- }
-
- return false;
- }
-
- /**
* Try to get a "lock" for a resource
*/
private boolean lock(final String id) {
@@ -1376,7 +1354,7 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
}
- this.finishJob(job, JobState.CANCELLED, false); // DROP means complete removal
+ this.finishJob(job, JobState.CANCELLED, false, -1); // DROP means complete removal
} else {
String targetId = null;
if ( config.getType() != QueueConfiguration.Type.IGNORE ) {
@@ -1397,7 +1375,7 @@ public class JobManagerImpl
/**
* Update the property of a job in the resource tree
*/
- public void persistJobProperties(final JobImpl job, final String... propNames) {
+ public boolean persistJobProperties(final JobImpl job, final String... propNames) {
ResourceResolver resolver = null;
try {
resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
@@ -1407,10 +1385,18 @@ public class JobManagerImpl
for(final String propName : propNames) {
final Object val = job.getProperty(propName);
if ( val != null ) {
- mvm.put(propName, job.getProperty(propName));
+ if ( val.getClass().isEnum() ) {
+ mvm.put(propName, val.toString());
+ } else {
+ mvm.put(propName, val);
+ }
+ } else {
+ mvm.remove(propName);
}
}
resolver.commit();
+
+ return true;
}
} catch ( final PersistenceException ignore ) {
this.ignoreException(ignore);
@@ -1421,6 +1407,7 @@ public class JobManagerImpl
resolver.close();
}
}
+ return false;
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Sep 30 16:03:13 2013
@@ -354,6 +354,9 @@ public abstract class AbstractJobQueue
return this.finishedJob(location, shouldReschedule ? JobStatus.FAILED : JobStatus.SUCCEEDED, false);
}
+ /**
+ * Handle job finish and determine whether to reschedule or cancel the job
+ */
private boolean finishedJob(final String jobId,
final JobStatus result,
final boolean isAsync) {
@@ -394,7 +397,7 @@ public abstract class AbstractJobQueue
if ( !rescheduleInfo.reschedule ) {
// we keep cancelled jobs and succeeded jobs if the queue is configured like this.
final boolean keepJobs = result.getState() != JobState.SUCCEEDED || this.configuration.isKeepJobs();
- handler.finished(result.getState(), keepJobs);
+ handler.finished(result.getState(), keepJobs, rescheduleInfo.processingTime);
finishSuccessful = true;
if ( result.getState() == JobState.SUCCEEDED ) {
Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
@@ -497,7 +500,7 @@ public abstract class AbstractJobQueue
final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic());
if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
- if ( handler.start() ) {
+ if ( handler.startProcessing(this) ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Starting job {}", Utility.toString(job));
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Mon Sep 30 16:03:13 2013
@@ -67,7 +67,7 @@ public abstract class ResourceHelper {
JobStatusNotifier.CONTEXT_PROPERTY_NAME,
JobImpl.PROPERTY_DELAY_OVERRIDE,
JobConsumer.PROPERTY_JOB_ASYNC_HANDLER,
- Job.PROPERTY_JOB_LOG,
+ Job.PROPERTY_JOB_PROGRESS_LOG,
Job.PROPERTY_JOB_PROGRESS_ETA,
Job.PROPERTY_JOB_PROGRESS_STEP,
Job.PROPERTY_JOB_PROGRESS_STEPS,
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java Mon Sep 30 16:03:13 2013
@@ -119,13 +119,15 @@ public interface Job {
* This property contains the optional output log of a job consumer.
* The value of this property is a string array.
* This property is read-only and can't be specified when the job is created.
+ * @since 1.3
*/
- String PROPERTY_JOB_LOG = "slingevent:log";
+ String PROPERTY_JOB_PROGRESS_LOG = "slingevent:progressLog";
/**
* This property contains the optional ETA for a job.
* The value of this property is a {@link Calendar} object.
* This property is read-only and can't be specified when the job is created.
+ * @since 1.3
*/
String PROPERTY_JOB_PROGRESS_ETA = "slingevent:progressETA";
@@ -135,6 +137,7 @@ public interface Job {
* assumed to consume roughly the same amount if time.
* The value of this property is an integer.
* This property is read-only and can't be specified when the job is created.
+ * @since 1.3
*/
String PROPERTY_JOB_PROGRESS_STEPS = "slingevent:progressSteps";
@@ -143,6 +146,7 @@ public interface Job {
* the number of completed steps.
* The value of this property is an integer.
* This property is read-only and can't be specified when the job is created.
+ * @since 1.3
*/
String PROPERTY_JOB_PROGRESS_STEP = "slingevent:progressStep";
@@ -150,6 +154,7 @@ public interface Job {
* This property contains the optional result message of a job consumer.
* The value of this property is a string.
* This property is read-only and can't be specified when the job is created.
+ * @since 1.3
*/
String PROPERTY_RESULT_MESSAGE = "slingevent:resultMessage";
@@ -157,6 +162,7 @@ public interface Job {
* This property contains the finished state of a job once it's marked as finished.
* TODO - DOCUMENT
* This property is read-only and can't be specified when the job is created.
+ * @since 1.3
*/
String PROPERTY_FINISHED_STATE = "slingevent:finishedState";
@@ -164,6 +170,7 @@ public interface Job {
* This property contains the finished date once a job is marked as finished.
* The value of this property is a {@link Calendar} object.
* This property is read-only and can't be specified when the job is created.
+ * @since 1.3
*/
String PROPERTY_FINISHED_DATE = "slingevent:finishedDate";
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java Mon Sep 30 16:03:13 2013
@@ -35,6 +35,7 @@ import javax.inject.Inject;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.apache.sling.launchpad.api.StartupHandler;
import org.apache.sling.launchpad.api.StartupMode;
import org.ops4j.pax.exam.Configuration;
@@ -213,6 +214,18 @@ public abstract class AbstractJobHandlin
}
/**
+ * Helper method to register a job executor
+ */
+ protected ServiceRegistration registerJobExecutor(final String topic,
+ final JobExecutor handler) {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(JobConsumer.PROPERTY_TOPICS, topic);
+ final ServiceRegistration reg = this.bc.registerService(JobExecutor.class.getName(),
+ handler, props);
+ return reg;
+ }
+
+ /**
* Helper method to remove a configuration
*/
protected void removeConfiguration(final String pid) throws IOException {
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1527622&r1=1527621&r2=1527622&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java Mon Sep 30 16:03:13 2013
@@ -32,7 +32,9 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.QueueConfiguration;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.event.jobs.consumer.JobStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -90,17 +92,17 @@ public class HistoryTest extends Abstrac
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testHistory() throws Exception {
- final ServiceRegistration reg = this.registerJobConsumer(TOPIC,
- new JobConsumer() {
+ final ServiceRegistration reg = this.registerJobExecutor(TOPIC,
+ new JobExecutor() {
@Override
- public JobResult process(final Job job) {
+ public JobStatus process(Job job, JobExecutionContext context) {
sleep(5L);
final long count = job.getProperty(PROP_COUNTER, Long.class);
if ( count == 2 || count == 5 || count == 7 ) {
- return JobResult.CANCEL;
+ return JobStatus.CANCELLED;
}
- return JobResult.OK;
+ return JobStatus.SUCCEEDED;
}
});