You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/24 14:39:44 UTC

svn commit: r1471385 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event: impl/jobs/JobImpl.java impl/jobs/JobManagerImpl.java impl/jobs/MaintenanceTask.java impl/jobs/queues/AbstractJobQueue.java jobs/consumer/JobConsumer.java

Author: cziegeler
Date: Wed Apr 24 12:39:44 2013
New Revision: 1471385

URL: http://svn.apache.org/r1471385
Log:
SLING-2829 :  Add API for starting a job and service interface for executing a job - first shot at an async processing api

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Wed Apr 24 12:39:44 2013
@@ -25,8 +25,8 @@ import java.util.Set;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.api.wrappers.ValueMapDecorator;
 import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.JobUtil.JobPriority;
 import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobUtil.JobPriority;
 import org.apache.sling.event.jobs.Queue;
 
 /**
@@ -209,12 +209,23 @@ public class JobImpl implements Job {
         return (String)this.getProperty(Job.PROPERTY_JOB_CREATED_INSTANCE);
     }
 
+    /**
+     * Update information about the queue.
+     */
     public void updateQueue(final Queue queue) {
         this.properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queue.getName());
         this.properties.put(Job.PROPERTY_JOB_RETRIES, queue.getConfiguration().getMaxRetries());
         this.properties.put(Job.PROPERTY_JOB_PRIORITY, queue.getConfiguration().getPriority());
     }
 
+    public void setProperty(final String name, final Object value) {
+        if ( value == null ) {
+            this.properties.remove(name);
+        } else {
+            this.properties.put(name, value);
+        }
+    }
+
     @Override
     public String toString() {
         return "JobImpl [properties=" + properties + ", topic=" + topic

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Apr 24 12:39:44 2013
@@ -66,16 +66,16 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.JobUtil.JobPriority;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
-import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobsIterator;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.Statistics;
 import org.apache.sling.event.jobs.TopicStatistics;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.event.EventConstants;
@@ -272,7 +272,8 @@ public class JobManagerImpl
      * Process a new job
      * This method first searches the corresponding queue - if such a queue
      * does not exist yet, it is created and started.
-     * @param handler The job handler
+     *
+     * @param job The job
      */
     void process(final JobImpl job) {
         final JobHandler handler = new JobHandler(job, this);
@@ -514,7 +515,6 @@ public class JobManagerImpl
     }
 
     private long stopProcessing() {
-        long changeCount = 0;
         this.backgroundLoader.stop();
 
         // let's rename/close all queues and clear them
@@ -527,6 +527,7 @@ public class JobManagerImpl
         }
 
         // deactivate old capabilities - this stops all background processes
+        long changeCount = 0;
         if ( this.topologyCapabilities != null ) {
             changeCount = this.topologyCapabilities.getChangeCount() + 1;
             this.topologyCapabilities.deactivate();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Wed Apr 24 12:39:44 2013
@@ -550,6 +550,11 @@ public class MaintenanceTask {
         }
     }
 
+    /**
+     * Reassign a job to a different target
+     * @param job The job
+     * @param targetId New target or <code>null</code> if unknown
+     */
     public void reassignJob(final JobImpl job, final String targetId) {
         ResourceResolver resolver = null;
         try {
@@ -574,7 +579,6 @@ public class MaintenanceTask {
                     resolver.commit();
                 } catch ( final PersistenceException pe ) {
                     this.ignoreException(pe);
-                    resolver.refresh();
                 }
             }
         } catch (final LoginException ignore) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Apr 24 12:39:44 2013
@@ -26,11 +26,13 @@ import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
@@ -305,6 +307,8 @@ public abstract class AbstractJobQueue
                 this.cancelledJob();
                 Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.getJob(), null);
                 break;
+            case ASYNC: // nothing to do here
+                break;
         }
 
         return reschedule;
@@ -462,32 +466,33 @@ public abstract class AbstractJobQueue
     /**
      * Process a job
      */
-    protected boolean executeJob(final JobHandler info) {
-        final JobConsumer consumer = this.jobConsumerManager.getConsumer(info.getJob().getTopic());
+    protected boolean executeJob(final JobHandler handler) {
+        final JobImpl job = handler.getJob();
+        final JobConsumer consumer = this.jobConsumerManager.getConsumer(job.getTopic());
 
-        if ( (consumer != null || (info.getJob().isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
-            if ( info.start() ) {
+        if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
+            if ( handler.start() ) {
                 if ( logger.isDebugEnabled() ) {
-                    logger.debug("Starting job {}", Utility.toString(info.getJob()));
+                    logger.debug("Starting job {}", Utility.toString(job));
                 }
                 try {
-                    info.started = System.currentTimeMillis();
+                    handler.started = System.currentTimeMillis();
                     // let's add the event to our processing list
                     synchronized ( this.startedJobsLists ) {
-                        this.startedJobsLists.put(info.getJob().getId(), info);
+                        this.startedJobsLists.put(job.getId(), handler);
                     }
 
                     if ( consumer != null ) {
                         // first check for a notifier context to send an acknowledge
                         boolean notify = true;
-                        if ( !this.sendAcknowledge(info.getJob().getId()) ) {
+                        if ( !this.sendAcknowledge(job.getId()) ) {
                             // if we don't get an ack, someone else is already processing this job.
                             // we process but do not notify the job event handler.
-                            logger.info("Someone else is already processing job {}.", Utility.toString(info.getJob()));
+                            logger.info("Someone else is already processing job {}.", Utility.toString(job));
                             notify = false;
                         }
 
-                        final JobUtil.JobPriority priority = (JobUtil.JobPriority) info.getJob().getProperty(Job.PROPERTY_JOB_PRIORITY);
+                        final JobUtil.JobPriority priority = job.getJobPriority();
                         final boolean notifyResult = notify;
 
                         final Runnable task = new Runnable() {
@@ -502,7 +507,7 @@ public abstract class AbstractJobQueue
                                 final String oldName = currentThread.getName();
                                 final int oldPriority = currentThread.getPriority();
 
-                                currentThread.setName(oldName + "-" + info.getJob().getProperty(Job.PROPERTY_JOB_QUEUE_NAME) + "(" + info.getJob().getTopic() + ")");
+                                currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
                                 if ( priority != null ) {
                                     switch ( priority ) {
                                         case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
@@ -514,17 +519,63 @@ public abstract class AbstractJobQueue
                                     }
                                 }
                                 JobConsumer.JobResult result = JobConsumer.JobResult.CANCEL;
+                                final Object asyncLock = new Object();
+                                final AtomicBoolean asnycDone = new AtomicBoolean(false);
+                                final JobConsumer.AsyncHandler asyncHandler =
+                                        new JobConsumer.AsyncHandler() {
+
+                                            private void check(final JobConsumer.JobResult result) {
+                                                synchronized ( asyncLock ) {
+                                                    if ( !asnycDone.get() ) {
+                                                        asnycDone.set(true);
+                                                        finishedJob(job.getId(), result);
+                                                    } else {
+                                                        throw new IllegalStateException("Job is already marked as processed");
+                                                    }
+                                                    asyncLock.notify();
+                                                }
+                                            }
+
+                                            @Override
+                                            public void ok() {
+                                                this.check(JobConsumer.JobResult.OK);
+                                                finishedJob(job.getId(), JobConsumer.JobResult.OK);
+                                            }
+
+                                            @Override
+                                            public void failed() {
+                                                this.check(JobConsumer.JobResult.FAILED);
+                                            }
+
+                                            @Override
+                                            public void cancel() {
+                                                this.check(JobConsumer.JobResult.CANCEL);
+                                            }
+                                        };
+                                job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler);
                                 try {
-                                    result = consumer.process(info.getJob());
+                                    result = consumer.process(job);
                                 } catch (final Throwable t) { //NOSONAR
-                                    logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(info.getJob()), t);
+                                    logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
                                     // we don't reschedule if an exception occurs
                                     result = JobConsumer.JobResult.CANCEL;
                                 } finally {
+                                    job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, null);
                                     currentThread.setPriority(oldPriority);
                                     currentThread.setName(oldName);
-                                    if ( notifyResult ) {
-                                        finishedJob(info.getJob().getId(), result);
+                                    if ( notifyResult && result != JobConsumer.JobResult.ASYNC ) {
+                                        finishedJob(job.getId(), result);
+                                    }
+                                }
+                                if ( result == JobConsumer.JobResult.ASYNC ) {
+                                    synchronized ( asyncLock ) {
+                                        while ( !asnycDone.get() ) {
+                                            try {
+                                                asyncLock.wait();
+                                            } catch (final InterruptedException e) {
+                                                ignoreException(e);
+                                            }
+                                        }
                                     }
                                 }
                             }
@@ -542,7 +593,7 @@ public abstract class AbstractJobQueue
                         }
 
                     } else {
-                        final Event jobEvent = this.getJobEvent(info);
+                        final Event jobEvent = this.getJobEvent(handler);
                         // we need async delivery, otherwise we might create a deadlock
                         // as this method runs inside a synchronized block and the finishedJob
                         // method as well!
@@ -556,11 +607,11 @@ public abstract class AbstractJobQueue
                 }
             } else {
                 if ( logger.isDebugEnabled() ) {
-                    logger.debug("Discarding removed job {}", Utility.toString(info.getJob()));
+                    logger.debug("Discarding removed job {}", Utility.toString(job));
                 }
             }
         } else {
-            info.reassign();
+            handler.reassign();
         }
         this.decQueued();
         return false;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java?rev=1471385&r1=1471384&r2=1471385&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java Wed Apr 24 12:39:44 2013
@@ -19,6 +19,9 @@
 package org.apache.sling.event.jobs.consumer;
 
 import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
+
+import aQute.bnd.annotation.ProviderType;
 
 
 
@@ -44,32 +47,56 @@ import org.apache.sling.event.jobs.Job;
  *   with the highest service ranking is used. If the ranking is equal, the one with
  *   the lowest service ID is used.
  *
- * @since 1.2
+ * @since 1.0
  */
+@ProviderType
 public interface JobConsumer {
 
     enum JobResult {
         OK,
         FAILED,
-        CANCEL
+        CANCEL,
+        ASYNC
+    }
+
+    /** Job property containing an asynchronous handler. */
+    String PROPERTY_JOB_ASYNC_HANDLER = ":sling:jobs:asynchandler";
+
+    /**
+     * If the consumer decides to process the job asynchronously, this handler
+     * interface can be used to notify finished processing. The asynchronous
+     * handler can be retried using the property name {@link #PROPERTY_JOB_ASYNC_HANDLER}.
+     */
+    interface AsyncHandler {
+
+        void failed();
+
+        void ok();
+
+        void cancel();
     }
+
     /**
      * Service registration property defining the jobs this consumer is able to process.
      * The value is either a string or an array of strings.
      */
     String PROPERTY_TOPICS = "job.topics";
 
+
     /**
      * Execute the job.
      *
-     * If the job has been processed successfully, {@link #JobResult.OK} should be returned.
-     * If the job has not been processed completely, but might be rescheduled {@link #JobResult.FAILED}
+     * If the job has been processed successfully, {@link JobResult.OK} should be returned.
+     * If the job has not been processed completely, but might be rescheduled {@link JobResult.FAILED}
      * should be returned.
-     * If the job processing failed and should not be rescheduled, {@link #JobResult.CANCEL} should
+     * If the job processing failed and should not be rescheduled, {@link JobResult.CANCEL} should
      * be returned.
      *
+     * If the consumer decides to process the job asynchronously it should return {@link JobResult.ASYNC}
+     * and notify the job manager by using the {@link AsyncHandler} interface.
+     *
      * If the processing fails with throwing an exception/throwable, the process will not be rescheduled
-     * and treated like the method would have returned {@link #JobResult.CANCEL}.
+     * and treated like the method would have returned {@link JobResult.CANCEL}.
      *
      * @param job The job
      * @return The job result