You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/24 14:39:44 UTC
svn commit: r1471385 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event:
impl/jobs/JobImpl.java impl/jobs/JobManagerImpl.java
impl/jobs/MaintenanceTask.java impl/jobs/queues/AbstractJobQueue.java
jobs/consumer/JobConsumer.java
Author: cziegeler
Date: Wed Apr 24 12:39:44 2013
New Revision: 1471385
URL: http://svn.apache.org/r1471385
Log:
SLING-2829 : Add API for starting a job and service interface for executing a job - first shot at an async processing api
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Wed Apr 24 12:39:44 2013
@@ -25,8 +25,8 @@ import java.util.Set;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.api.wrappers.ValueMapDecorator;
import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.JobUtil.JobPriority;
import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobUtil.JobPriority;
import org.apache.sling.event.jobs.Queue;
/**
@@ -209,12 +209,23 @@ public class JobImpl implements Job {
return (String)this.getProperty(Job.PROPERTY_JOB_CREATED_INSTANCE);
}
+ /**
+ * Update information about the queue.
+ */
public void updateQueue(final Queue queue) {
this.properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queue.getName());
this.properties.put(Job.PROPERTY_JOB_RETRIES, queue.getConfiguration().getMaxRetries());
this.properties.put(Job.PROPERTY_JOB_PRIORITY, queue.getConfiguration().getPriority());
}
+ public void setProperty(final String name, final Object value) {
+ if ( value == null ) {
+ this.properties.remove(name);
+ } else {
+ this.properties.put(name, value);
+ }
+ }
+
@Override
public String toString() {
return "JobImpl [properties=" + properties + ", topic=" + topic
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Apr 24 12:39:44 2013
@@ -66,16 +66,16 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.JobUtil.JobPriority;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
-import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobsIterator;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;
@@ -272,7 +272,8 @@ public class JobManagerImpl
* Process a new job
* This method first searches the corresponding queue - if such a queue
* does not exist yet, it is created and started.
- * @param handler The job handler
+ *
+ * @param job The job
*/
void process(final JobImpl job) {
final JobHandler handler = new JobHandler(job, this);
@@ -514,7 +515,6 @@ public class JobManagerImpl
}
private long stopProcessing() {
- long changeCount = 0;
this.backgroundLoader.stop();
// let's rename/close all queues and clear them
@@ -527,6 +527,7 @@ public class JobManagerImpl
}
// deactivate old capabilities - this stops all background processes
+ long changeCount = 0;
if ( this.topologyCapabilities != null ) {
changeCount = this.topologyCapabilities.getChangeCount() + 1;
this.topologyCapabilities.deactivate();
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Wed Apr 24 12:39:44 2013
@@ -550,6 +550,11 @@ public class MaintenanceTask {
}
}
+ /**
+ * Reassign a job to a different target
+ * @param job The job
+ * @param targetId New target or <code>null</code> if unknown
+ */
public void reassignJob(final JobImpl job, final String targetId) {
ResourceResolver resolver = null;
try {
@@ -574,7 +579,6 @@ public class MaintenanceTask {
resolver.commit();
} catch ( final PersistenceException pe ) {
this.ignoreException(pe);
- resolver.refresh();
}
}
} catch (final LoginException ignore) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Apr 24 12:39:44 2013
@@ -26,11 +26,13 @@ import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
@@ -305,6 +307,8 @@ public abstract class AbstractJobQueue
this.cancelledJob();
Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.getJob(), null);
break;
+ case ASYNC: // nothing to do here
+ break;
}
return reschedule;
@@ -462,32 +466,33 @@ public abstract class AbstractJobQueue
/**
* Process a job
*/
- protected boolean executeJob(final JobHandler info) {
- final JobConsumer consumer = this.jobConsumerManager.getConsumer(info.getJob().getTopic());
+ protected boolean executeJob(final JobHandler handler) {
+ final JobImpl job = handler.getJob();
+ final JobConsumer consumer = this.jobConsumerManager.getConsumer(job.getTopic());
- if ( (consumer != null || (info.getJob().isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
- if ( info.start() ) {
+ if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
+ if ( handler.start() ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Starting job {}", Utility.toString(info.getJob()));
+ logger.debug("Starting job {}", Utility.toString(job));
}
try {
- info.started = System.currentTimeMillis();
+ handler.started = System.currentTimeMillis();
// let's add the event to our processing list
synchronized ( this.startedJobsLists ) {
- this.startedJobsLists.put(info.getJob().getId(), info);
+ this.startedJobsLists.put(job.getId(), handler);
}
if ( consumer != null ) {
// first check for a notifier context to send an acknowledge
boolean notify = true;
- if ( !this.sendAcknowledge(info.getJob().getId()) ) {
+ if ( !this.sendAcknowledge(job.getId()) ) {
// if we don't get an ack, someone else is already processing this job.
// we process but do not notify the job event handler.
- logger.info("Someone else is already processing job {}.", Utility.toString(info.getJob()));
+ logger.info("Someone else is already processing job {}.", Utility.toString(job));
notify = false;
}
- final JobUtil.JobPriority priority = (JobUtil.JobPriority) info.getJob().getProperty(Job.PROPERTY_JOB_PRIORITY);
+ final JobUtil.JobPriority priority = job.getJobPriority();
final boolean notifyResult = notify;
final Runnable task = new Runnable() {
@@ -502,7 +507,7 @@ public abstract class AbstractJobQueue
final String oldName = currentThread.getName();
final int oldPriority = currentThread.getPriority();
- currentThread.setName(oldName + "-" + info.getJob().getProperty(Job.PROPERTY_JOB_QUEUE_NAME) + "(" + info.getJob().getTopic() + ")");
+ currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
if ( priority != null ) {
switch ( priority ) {
case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
@@ -514,17 +519,63 @@ public abstract class AbstractJobQueue
}
}
JobConsumer.JobResult result = JobConsumer.JobResult.CANCEL;
+ final Object asyncLock = new Object();
+ final AtomicBoolean asnycDone = new AtomicBoolean(false);
+ final JobConsumer.AsyncHandler asyncHandler =
+ new JobConsumer.AsyncHandler() {
+
+ private void check(final JobConsumer.JobResult result) {
+ synchronized ( asyncLock ) {
+ if ( !asnycDone.get() ) {
+ asnycDone.set(true);
+ finishedJob(job.getId(), result);
+ } else {
+ throw new IllegalStateException("Job is already marked as processed");
+ }
+ asyncLock.notify();
+ }
+ }
+
+ @Override
+ public void ok() {
+ this.check(JobConsumer.JobResult.OK);
+ finishedJob(job.getId(), JobConsumer.JobResult.OK);
+ }
+
+ @Override
+ public void failed() {
+ this.check(JobConsumer.JobResult.FAILED);
+ }
+
+ @Override
+ public void cancel() {
+ this.check(JobConsumer.JobResult.CANCEL);
+ }
+ };
+ job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler);
try {
- result = consumer.process(info.getJob());
+ result = consumer.process(job);
} catch (final Throwable t) { //NOSONAR
- logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(info.getJob()), t);
+ logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
// we don't reschedule if an exception occurs
result = JobConsumer.JobResult.CANCEL;
} finally {
+ job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, null);
currentThread.setPriority(oldPriority);
currentThread.setName(oldName);
- if ( notifyResult ) {
- finishedJob(info.getJob().getId(), result);
+ if ( notifyResult && result != JobConsumer.JobResult.ASYNC ) {
+ finishedJob(job.getId(), result);
+ }
+ }
+ if ( result == JobConsumer.JobResult.ASYNC ) {
+ synchronized ( asyncLock ) {
+ while ( !asnycDone.get() ) {
+ try {
+ asyncLock.wait();
+ } catch (final InterruptedException e) {
+ ignoreException(e);
+ }
+ }
}
}
}
@@ -542,7 +593,7 @@ public abstract class AbstractJobQueue
}
} else {
- final Event jobEvent = this.getJobEvent(info);
+ final Event jobEvent = this.getJobEvent(handler);
// we need async delivery, otherwise we might create a deadlock
// as this method runs inside a synchronized block and the finishedJob
// method as well!
@@ -556,11 +607,11 @@ public abstract class AbstractJobQueue
}
} else {
if ( logger.isDebugEnabled() ) {
- logger.debug("Discarding removed job {}", Utility.toString(info.getJob()));
+ logger.debug("Discarding removed job {}", Utility.toString(job));
}
}
} else {
- info.reassign();
+ handler.reassign();
}
this.decQueued();
return false;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java Wed Apr 24 12:39:44 2013
@@ -19,6 +19,9 @@
package org.apache.sling.event.jobs.consumer;
import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
+
+import aQute.bnd.annotation.ProviderType;
@@ -44,32 +47,56 @@ import org.apache.sling.event.jobs.Job;
* with the highest service ranking is used. If the ranking is equal, the one with
* the lowest service ID is used.
*
- * @since 1.2
+ * @since 1.0
*/
+@ProviderType
public interface JobConsumer {
enum JobResult {
OK,
FAILED,
- CANCEL
+ CANCEL,
+ ASYNC
+ }
+
+ /** Job property containing an asynchronous handler. */
+ String PROPERTY_JOB_ASYNC_HANDLER = ":sling:jobs:asynchandler";
+
+ /**
+ * If the consumer decides to process the job asynchronously, this handler
+ * interface can be used to notify finished processing. The asynchronous
+ * handler can be retried using the property name {@link #PROPERTY_JOB_ASYNC_HANDLER}.
+ */
+ interface AsyncHandler {
+
+ void failed();
+
+ void ok();
+
+ void cancel();
}
+
/**
* Service registration property defining the jobs this consumer is able to process.
* The value is either a string or an array of strings.
*/
String PROPERTY_TOPICS = "job.topics";
+
/**
* Execute the job.
*
- * If the job has been processed successfully, {@link #JobResult.OK} should be returned.
- * If the job has not been processed completely, but might be rescheduled {@link #JobResult.FAILED}
+ * If the job has been processed successfully, {@link JobResult.OK} should be returned.
+ * If the job has not been processed completely, but might be rescheduled {@link JobResult.FAILED}
* should be returned.
- * If the job processing failed and should not be rescheduled, {@link #JobResult.CANCEL} should
+ * If the job processing failed and should not be rescheduled, {@link JobResult.CANCEL} should
* be returned.
*
+ * If the consumer decides to process the job asynchronously it should return {@link JobResult.ASYNC}
+ * and notify the job manager by using the {@link AsyncHandler} interface.
+ *
* If the processing fails with throwing an exception/throwable, the process will not be rescheduled
- * and treated like the method would have returned {@link #JobResult.CANCEL}.
+ * and treated like the method would have returned {@link JobResult.CANCEL}.
*
* @param job The job
* @return The job result