You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/15 09:43:10 UTC
svn commit: r1532234 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/queues/
main/java/org/apache/sling/event/jobs/
main/java/org/apache/sling/event/jobs/consumer/ t...
Author: cziegeler
Date: Tue Oct 15 07:43:09 2013
New Revision: 1532234
URL: http://svn.apache.org/r1532234
Log:
SLING-3169 : Naming of Job related enumerations
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java (with props)
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java?rev=1532234&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java Tue Oct 15 07:43:09 2013
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+
+/**
+ * The state of the job after it has been processed by a {@link JobExecutor}.
+ */
+public enum InternalJobState {
+
+ SUCCEEDED, // processing finished successfully
+ FAILED, // processing failed, can be retried
+ CANCELLED // processing failed permanently
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/InternalJobState.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Tue Oct 15 07:43:09 2013
@@ -46,8 +46,8 @@ import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
-import org.apache.sling.event.jobs.consumer.JobStatus;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
@@ -309,7 +309,8 @@ public class JobConsumerManager {
// notify listener
for(final Object[] listenerObjects : this.listenerMap.values()) {
if ( listenerObjects[0] == oldConsumer.executor ) {
- ((JobExecutionContext)listenerObjects[1]).asyncProcessingFinished(JobStatus.FAILED);
+ final JobExecutionContext context = (JobExecutionContext)listenerObjects[1];
+ context.asyncProcessingFinished(context.FAILED());
break;
}
}
@@ -445,14 +446,14 @@ public class JobConsumerManager {
}
@Override
- public JobStatus process(final Job job, final JobExecutionContext context) {
+ public JobExecutionResult process(final Job job, final JobExecutionContext context) {
final JobConsumer.AsyncHandler asyncHandler =
new JobConsumer.AsyncHandler() {
final Object asyncLock = new Object();
final AtomicBoolean asyncDone = new AtomicBoolean(false);
- private void check(final JobStatus result) {
+ private void check(final JobExecutionResult result) {
synchronized ( asyncLock ) {
if ( !asyncDone.get() ) {
asyncDone.set(true);
@@ -465,17 +466,17 @@ public class JobConsumerManager {
@Override
public void ok() {
- this.check(JobStatus.SUCCEEDED);
+ this.check(context.SUCCEEDED());
}
@Override
public void failed() {
- this.check(JobStatus.FAILED);
+ this.check(context.FAILED());
}
@Override
public void cancel() {
- this.check(JobStatus.CANCELLED);
+ this.check(context.CANCELLED());
}
};
((JobImpl)job).setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler);
@@ -483,11 +484,11 @@ public class JobConsumerManager {
if ( result == JobResult.ASYNC ) {
return null;
} else if ( result == JobResult.FAILED) {
- return JobStatus.FAILED;
+ return context.FAILED();
} else if ( result == JobResult.OK) {
- return JobStatus.SUCCEEDED;
+ return context.SUCCEEDED();
}
- return JobStatus.CANCELLED;
+ return context.CANCELLED();
}
}
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java?rev=1532234&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java Tue Oct 15 07:43:09 2013
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+
+public class JobExecutionResultImpl implements JobExecutionResult {
+
+ public static final JobExecutionResultImpl SUCCEEDED = new JobExecutionResultImpl(InternalJobState.SUCCEEDED, null, null);
+ public static final JobExecutionResultImpl CANCELLED = new JobExecutionResultImpl(InternalJobState.CANCELLED, null, null);
+ public static final JobExecutionResultImpl FAILED = new JobExecutionResultImpl(InternalJobState.FAILED, null, null);
+
+ private final InternalJobState state;
+
+ private final String message;
+
+ private final Long retryDelayInMs;
+
+ public JobExecutionResultImpl(final InternalJobState state,
+ final String message,
+ final Long retryDelayInMs) {
+ this.state = state;
+ this.message = message;
+ this.retryDelayInMs = retryDelayInMs;
+ }
+
+ public InternalJobState getState() {
+ return this.state;
+ }
+
+ @Override
+ public boolean succeeded() {
+ return this.state == InternalJobState.SUCCEEDED;
+ }
+
+ @Override
+ public boolean cancelled() {
+ return this.state == InternalJobState.CANCELLED;
+ }
+
+ @Override
+ public boolean failed() {
+ return this.state == InternalJobState.FAILED;
+ }
+
+ @Override
+ public String getMessage() {
+ return this.message;
+ }
+
+ @Override
+ public Long getRetryDelayInMs() {
+ return this.retryDelayInMs;
+ }
+
+ @Override
+ public String toString() {
+ return "JobExecutionResultImpl [state=" + state + ", message="
+ + message + ", retryDelayInMs=" + retryDelayInMs + "]";
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Tue Oct 15 07:43:09 2013
@@ -20,7 +20,6 @@ package org.apache.sling.event.impl.jobs
import org.apache.sling.event.jobs.Queue;
-import org.apache.sling.event.jobs.consumer.JobState;
/**
@@ -55,7 +54,7 @@ public class JobHandler {
* Finish the processing of the job
* @param state The state of processing
*/
- public void finished(final JobState state, final boolean keepJobInHistory, final long duration) {
+ public void finished(final InternalJobState state, final boolean keepJobInHistory, final long duration) {
// for now we just keep cancelled jobs
this.jobManager.finishJob(this.job, state, keepJobInHistory, duration);
}
@@ -69,7 +68,7 @@ public class JobHandler {
}
public void cancel() {
- this.jobManager.finishJob(this.job, JobState.CANCELLED, true, -1);
+ this.jobManager.finishJob(this.job, InternalJobState.CANCELLED, true, -1);
}
public void reassign() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Tue Oct 15 07:43:09 2013
@@ -319,15 +319,15 @@ public class JobImpl implements Job {
}
@Override
- public JobType getJobType() {
+ public JobState getJobState() {
final String enumValue = this.getProperty(JobImpl.PROPERTY_FINISHED_STATE, String.class);
if ( enumValue == null ) {
if ( this.getProcessingStarted() != null ) {
- return JobType.ACTIVE;
+ return JobState.ACTIVE;
}
- return JobType.QUEUED;
+ return JobState.QUEUED;
}
- return JobType.valueOf(enumValue);
+ return JobState.valueOf(enumValue);
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Tue Oct 15 07:43:09 2013
@@ -81,7 +81,6 @@ import org.apache.sling.event.jobs.Sched
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
import org.apache.sling.event.jobs.consumer.JobExecutor;
-import org.apache.sling.event.jobs.consumer.JobState;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -308,7 +307,7 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
}
- this.finishJob(job, JobState.CANCELLED, false, -1);
+ this.finishJob(job, InternalJobState.CANCELLED, false, -1);
} else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
if ( !reassign ) {
if ( logger.isDebugEnabled() ) {
@@ -344,7 +343,7 @@ public class JobManagerImpl
if ( queue == null ) {
// this is just a sanity check, actually we can never get here
logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
- this.finishJob(job, JobState.CANCELLED, false, -1);
+ this.finishJob(job, InternalJobState.CANCELLED, false, -1);
} else {
queues.put(queueInfo.queueName, queue);
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
@@ -801,7 +800,7 @@ public class JobManagerImpl
}
}
} else {
- this.finishJob(job, JobState.CANCELLED, true, -1);
+ this.finishJob(job, InternalJobState.CANCELLED, true, -1);
}
}
} else {
@@ -977,11 +976,11 @@ public class JobManagerImpl
buf.append(ISO9075.encode(JobImpl.PROPERTY_FINISHED_STATE));
if ( type == QueryType.SUCCEEDED ) {
buf.append(" = '");
- buf.append(JobState.SUCCEEDED.name());
+ buf.append(InternalJobState.SUCCEEDED.name());
buf.append("'");
} else if ( type == QueryType.CANCELLED ) {
buf.append(" = '");
- buf.append(JobState.CANCELLED.name());
+ buf.append(InternalJobState.CANCELLED.name());
buf.append("'");
}
} else {
@@ -1067,10 +1066,10 @@ public class JobManagerImpl
* @param state The state of the processing
*/
public void finishJob(final JobImpl job,
- final JobState state,
+ final InternalJobState state,
final boolean keepJobInHistory,
final long duration) {
- final boolean isSuccess = (state == JobState.SUCCEEDED);
+ final boolean isSuccess = (state == InternalJobState.SUCCEEDED);
ResourceResolver resolver = null;
try {
resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
@@ -1082,7 +1081,7 @@ public class JobManagerImpl
final ValueMap vm = ResourceHelper.getValueMap(jobResource);
newPath = this.configuration.getStoragePath(job, isSuccess);
final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ? JobState.SUCCEEDED.name() : JobState.CANCELLED.name());
+ props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ? InternalJobState.SUCCEEDED.name() : InternalJobState.CANCELLED.name());
if ( isSuccess ) {
// we set the finish date to start date + duration
final Date finishDate = new Date();
@@ -1365,7 +1364,7 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
}
- this.finishJob(job, JobState.CANCELLED, false, -1); // DROP means complete removal
+ this.finishJob(job, InternalJobState.CANCELLED, false, -1); // DROP means complete removal
} else {
String targetId = null;
if ( config.getType() != QueueConfiguration.Type.IGNORE ) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Tue Oct 15 07:43:09 2013
@@ -31,7 +31,9 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.Utility;
@@ -45,9 +47,8 @@ import org.apache.sling.event.jobs.Notif
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
-import org.apache.sling.event.jobs.consumer.JobState;
-import org.apache.sling.event.jobs.consumer.JobStatus;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -304,7 +305,7 @@ public abstract class AbstractJobQueue
public long processingTime;
}
- private RescheduleInfo handleReschedule(final JobHandler jobEvent, final JobStatus result) {
+ private RescheduleInfo handleReschedule(final JobHandler jobEvent, final JobExecutionResultImpl result) {
final RescheduleInfo info = new RescheduleInfo();
switch ( result.getState() ) {
case SUCCEEDED : // job is finished
@@ -352,14 +353,14 @@ public abstract class AbstractJobQueue
@Override
public boolean finishedJob(final Event job, final boolean shouldReschedule) {
final String location = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID);
- return this.finishedJob(location, shouldReschedule ? JobStatus.FAILED : JobStatus.SUCCEEDED, false);
+ return this.finishedJob(location, shouldReschedule ? JobExecutionResultImpl.FAILED : JobExecutionResultImpl.SUCCEEDED, false);
}
/**
* Handle job finish and determine whether to reschedule or cancel the job
*/
private boolean finishedJob(final String jobId,
- final JobStatus result,
+ final JobExecutionResultImpl result,
final boolean isAsync) {
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Received finish for job {}, result={}", jobId, result);
@@ -397,10 +398,10 @@ public abstract class AbstractJobQueue
if ( !rescheduleInfo.reschedule ) {
// we keep cancelled jobs and succeeded jobs if the queue is configured like this.
- final boolean keepJobs = result.getState() != JobState.SUCCEEDED || this.configuration.isKeepJobs();
+ final boolean keepJobs = result.getState() != InternalJobState.SUCCEEDED || this.configuration.isKeepJobs();
handler.finished(result.getState(), keepJobs, rescheduleInfo.processingTime);
finishSuccessful = true;
- if ( result.getState() == JobState.SUCCEEDED ) {
+ if ( result.getState() == InternalJobState.SUCCEEDED ) {
Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
} else {
Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
@@ -540,7 +541,7 @@ public abstract class AbstractJobQueue
break;
}
}
- JobStatus result = JobStatus.CANCELLED;
+ JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
final AtomicBoolean isAsync = new AtomicBoolean(false);
try {
@@ -583,19 +584,63 @@ public abstract class AbstractJobQueue
}
@Override
- public void asyncProcessingFinished(final JobStatus status) {
+ public void asyncProcessingFinished(final JobExecutionResult status) {
synchronized ( lock ) {
if ( isAsync.compareAndSet(true, false) ) {
jobConsumerManager.unregisterListener(job.getId());
- finishedJob(job.getId(), status, true);
+ finishedJob(job.getId(), (JobExecutionResultImpl)status, true);
asyncCounter.decrementAndGet();
} else {
throw new IllegalStateException("Job is not processed async " + job.getId());
}
}
}
+
+ @Override
+ public ResultBuilder result(final String message) {
+ return new ResultBuilder() {
+
+ private Long retryDelayInMs;
+
+ @Override
+ public ResultBuilder retryDelay(final long retryDelayInMs) {
+ this.retryDelayInMs = retryDelayInMs;
+ return this;
+ }
+
+ @Override
+ public JobExecutionResult SUCCEEDED() {
+ return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs);
+ }
+
+ @Override
+ public JobExecutionResult FAILED() {
+ return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+ }
+
+ @Override
+ public JobExecutionResult CANCELLED() {
+ return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs);
+ }
+ };
+ }
+
+ @Override
+ public JobExecutionResult SUCCEEDED() {
+ return JobExecutionResultImpl.SUCCEEDED;
+ }
+
+ @Override
+ public JobExecutionResult FAILED() {
+ return JobExecutionResultImpl.FAILED;
+ }
+
+ @Override
+ public JobExecutionResult CANCELLED() {
+ return JobExecutionResultImpl.CANCELLED;
+ }
};
- result = consumer.process(job, ctx);
+ result = (JobExecutionResultImpl)consumer.process(job, ctx);
if ( result == null ) { // ASYNC processing
jobConsumerManager.registerListener(job.getId(), consumer, ctx);
asyncCounter.incrementAndGet();
@@ -606,7 +651,7 @@ public abstract class AbstractJobQueue
} catch (final Throwable t) { //NOSONAR
logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
// we don't reschedule if an exception occurs
- result = JobStatus.CANCELLED;
+ result = JobExecutionResultImpl.CANCELLED;
} finally {
currentThread.setPriority(oldPriority);
currentThread.setName(oldName);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java Tue Oct 15 07:43:09 2013
@@ -160,11 +160,15 @@ public interface Job {
*/
String PROPERTY_FINISHED_DATE = "slingevent:finishedDate";
- enum JobType {
- QUEUED,
- ACTIVE,
- SUCCEEDED,
- CANCELLED
+ /**
+ * The current job state.
+ * @since 1.3
+ */
+ enum JobState {
+ QUEUED, // waiting in queue after adding or for restart after failing
+ ACTIVE, // job is currently in processing
+ SUCCEEDED, // processing finished successfully
+ CANCELLED, // processing failed permanently
};
/**
@@ -283,10 +287,10 @@ public interface Job {
String getCreatedInstance();
/**
- * Get the job type
+ * Get the job state
* @since 1.3
*/
- JobType getJobType();
+ JobState getJobState();
/**
* If the job is cancelled or succeeded, this method will return the finish date.
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java Tue Oct 15 07:43:09 2013
@@ -99,50 +99,6 @@ public interface JobManager {
Job addJob(String topic, Map<String, Object> properties);
/**
- * Add a new job
- *
- * If the topic is <code>null</code> or illegal, no job is created and <code>null</code> is returned.
- * If properties are provided, all of them must be serializable. If there are non serializable
- * objects in the properties, no job is created and <code>null</code> is returned.
- * A job topic is a hierarchical name separated by dashes, each part has to start with a letter,
- * allowed characters are letters, numbers and the underscore.
- *
- * This method allows to specify a job name which should uniquely identify this job. If a job with
- * the same name is started on different instances, the job is still processed only once. However,
- * the topology api in combination with the leader selection provides a better way for
- * dealing with this situation and as jobs with name come with a heavy processing overhead
- * these should be avoided.
- *
- * The returned job object is a snapshot of the job state taken at the time of creation. Updates
- * to the job state are not reflected and the client needs to get a new job object using the job id.
- *
- * If the queue for processing this job is configured to drop the job, <code>null</code> is returned
- * as well.
- *
- * @param topic The required job topic.
- * @param name Optional unique job name
- * @param properties Optional job properties. The properties must be serializable.
- * @return The new job - or <code>null</code> if the job could not be created.
- * @since 1.2
- * @deprecated
- */
- @Deprecated
- Job addJob(String topic, String name, Map<String, Object> properties);
-
- /**
- * Return a job based on the unique job name.
- *
- * The returned job object is a snapshot of the job state taken at the time of the call. Updates
- * to the job state are not reflected and the client needs to get a new job object using the job id.
- *
- * @return A job or <code>null</code>
- * @since 1.2
- * @deprecated
- */
- @Deprecated
- Job getJobByName(String name);
-
- /**
* Return a job based on the unique id.
*
* The returned job object is a snapshot of the job state taken at the time of the call. Updates
@@ -235,6 +191,50 @@ public interface JobManager {
ScheduledJobInfo getScheduledJob(final String name);
/**
+ * Add a new job
+ *
+ * If the topic is <code>null</code> or illegal, no job is created and <code>null</code> is returned.
+ * If properties are provided, all of them must be serializable. If there are non serializable
+ * objects in the properties, no job is created and <code>null</code> is returned.
+ * A job topic is a hierarchical name separated by dashes, each part has to start with a letter,
+ * allowed characters are letters, numbers and the underscore.
+ *
+ * This method allows to specify a job name which should uniquely identify this job. If a job with
+ * the same name is started on different instances, the job is still processed only once. However,
+ * the topology api in combination with the leader selection provides a better way for
+ * dealing with this situation and as jobs with name come with a heavy processing overhead
+ * these should be avoided.
+ *
+ * The returned job object is a snapshot of the job state taken at the time of creation. Updates
+ * to the job state are not reflected and the client needs to get a new job object using the job id.
+ *
+ * If the queue for processing this job is configured to drop the job, <code>null</code> is returned
+ * as well.
+ *
+ * @param topic The required job topic.
+ * @param name Optional unique job name
+ * @param properties Optional job properties. The properties must be serializable.
+ * @return The new job - or <code>null</code> if the job could not be created.
+ * @since 1.2
+ * @deprecated
+ */
+ @Deprecated
+ Job addJob(String topic, String name, Map<String, Object> properties);
+
+ /**
+ * Return a job based on the unique job name.
+ *
+ * The returned job object is a snapshot of the job state taken at the time of the call. Updates
+ * to the job state are not reflected and the client needs to get a new job object using the job id.
+ *
+ * @return A job or <code>null</code>
+ * @since 1.2
+ * @deprecated
+ */
+ @Deprecated
+ Job getJobByName(String name);
+
+ /**
* Return all jobs either running or scheduled.
*
* @param type Required parameter for the type: either all jobs, only queued or only started can be returned.
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java Tue Oct 15 07:43:09 2013
@@ -32,7 +32,7 @@ public interface JobExecutionContext {
* @throws IllegalStateException If the job is not processed asynchronously
* or if this method has already been called.
*/
- void asyncProcessingFinished(final JobStatus status);
+ void asyncProcessingFinished(final JobExecutionResult result);
/**
* If a job is stoppable, it should periodically check this method
@@ -91,4 +91,26 @@ public interface JobExecutionContext {
* @param args Additional arguments
*/
void log(final String message, final Object...args);
+
+ ResultBuilder result(final String message);
+
+ JobExecutionResult SUCCEEDED();
+
+ JobExecutionResult FAILED();
+
+ JobExecutionResult CANCELLED();
+
+ public interface ResultBuilder {
+
+ /**
+ * @param retryDelayInMs The new retry delay in ms.
+ */
+ ResultBuilder retryDelay(final long retryDelayInMs);
+
+ JobExecutionResult SUCCEEDED();
+
+ JobExecutionResult FAILED();
+
+ JobExecutionResult CANCELLED();
+ }
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java?rev=1532234&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java Tue Oct 15 07:43:09 2013
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs.consumer;
+
+import aQute.bnd.annotation.ProviderType;
+
+/**
+ * The status of a job after it has been processed by a {@link JobExecutor}.
+ *
+ * A job executor can either use one of the constants {@link #SUCCEEDED}, {@link #FAILED}
+ * or {@link #CANCELLED} to return a result or it can build an individual result
+ * with optional parameters using the builder methods {@link #SUCCEEDED()}, {@link #FAILED()}
+ * or {@link #CANCELLED()}.
+ *
+ * @since 1.1
+ */
+@ProviderType
+public interface JobExecutionResult {
+
+ boolean succeeded();
+
+ boolean cancelled();
+
+ boolean failed();
+
+ /**
+ * Return the optional message.
+ * @return The message or <code>null</code>
+ */
+ String getMessage();
+
+ /**
+ * Return the retry delay in ms
+ * @return The new retry delay (>= 0) or <code>null</code>
+ */
+ Long getRetryDelayInMs();
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionResult.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java Tue Oct 15 07:43:09 2013
@@ -63,25 +63,25 @@ public interface JobExecutor {
/**
* Execute the job.
*
- * If the job has been processed successfully, {@link JobStatus.SUCCEEDED} should be returned.
- * If the job has not been processed completely, but might be rescheduled {@link JobStatus.FAILED}
+ * If the job has been processed successfully, {@link JobExecutionContext#SUCCEEDED()} should be returned.
+ * If the job has not been processed completely, but might be rescheduled {@link JobExecutionContext#FAILED()}
* should be returned.
- * If the job processing failed and should not be rescheduled, {@link JobStatus.CANCELLED} should
+ * If the job processing failed and should not be rescheduled, {@link JobExecutionContext#CANCELLED()} should
* be returned.
*
* If the executor decides to process the job asynchronously it should return <code>null</code>
- * and notify the job manager by using the {@link JobExecutionContext#asyncProcessingFinished(JobStatus)}
+ * and notify the job manager by using the {@link JobExecutionContext#asyncProcessingFinished(JobExecutionResult)}
* method.
*
* If the processing fails with throwing an exception/throwable, the process will not be rescheduled
- * and treated like the method would have returned {@link JobStatus.CANCELLED}.
+ * and treated like the method would have returned {@link JobExecutionContext#CANCELLED()}.
*
- * Instead of the constants from the JobStatus class, this method can return a custom instance containing
- * additional information.
+ * Instead of the constants from the JobExecutionContext class, this method can return a custom instance containing
+ * additional information using the builder pattern available from {@link JobExecutionContext#result(String)}.
*
* @param job The job
* @param context The execution context.
- * @return The job status
+ * @return The job execution result
*/
- JobStatus process(Job job, JobExecutionContext context);
+ JobExecutionResult process(Job job, JobExecutionContext context);
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1532234&r1=1532233&r2=1532234&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java Tue Oct 15 07:43:09 2013
@@ -34,9 +34,8 @@ import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
-import org.apache.sling.event.jobs.consumer.JobState;
-import org.apache.sling.event.jobs.consumer.JobStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -98,13 +97,13 @@ public class HistoryTest extends Abstrac
new JobExecutor() {
@Override
- public JobStatus process(Job job, JobExecutionContext context) {
+ public JobExecutionResult process(final Job job, final JobExecutionContext context) {
sleep(5L);
final long count = job.getProperty(PROP_COUNTER, Long.class);
if ( count == 2 || count == 5 || count == 7 ) {
- return new JobStatus(JobState.CANCELLED, JobState.CANCELLED.name());
+ return context.result(Job.JobState.CANCELLED.name()).CANCELLED();
}
- return new JobStatus(JobState.SUCCEEDED, JobState.SUCCEEDED.name());
+ return context.result(Job.JobState.SUCCEEDED.name()).SUCCEEDED();
}
});
@@ -128,11 +127,11 @@ public class HistoryTest extends Abstrac
final long count = j.getProperty(PROP_COUNTER, Long.class);
assertEquals(last, count);
if ( count == 2 || count == 5 || count == 7 ) {
- assertEquals(Job.JobType.CANCELLED, j.getJobType());
+ assertEquals(Job.JobState.CANCELLED, j.getJobState());
} else {
- assertEquals(Job.JobType.SUCCEEDED, j.getJobType());
+ assertEquals(Job.JobState.SUCCEEDED, j.getJobState());
}
- assertEquals(j.getJobType().name(), j.getResultMessage());
+ assertEquals(j.getJobState().name(), j.getResultMessage());
last--;
}
} finally {