You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/09/17 19:10:40 UTC

svn commit: r1524130 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event: impl/jobs/ impl/jobs/queues/ jobs/ jobs/consumer/

Author: cziegeler
Date: Tue Sep 17 17:10:39 2013
New Revision: 1524130

URL: http://svn.apache.org/r1524130
Log:
SLING-3028 :  Support for progress tracking of jobs 

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1524130&r1=1524129&r2=1524130&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Tue Sep 17 17:10:39 2013
@@ -455,7 +455,7 @@ public class JobConsumerManager {
             ((JobImpl)job).setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler);
             final JobConsumer.JobResult result = this.consumer.process(job);
             if ( result == JobResult.ASYNC ) {
-                return JobStatus.ASYNC;
+                return null;
             } else if ( result == JobResult.FAILED) {
                 return JobStatus.FAILED;
             } else if ( result == JobResult.OK) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1524130&r1=1524129&r2=1524130&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Tue Sep 17 17:10:39 2013
@@ -339,8 +339,6 @@ public abstract class AbstractJobQueue
                 this.cancelledJob();
                 Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.getJob(), null);
                 break;
-            case ASYNC: // nothing to do here
-                break;
         }
 
         return reschedule;
@@ -578,7 +576,7 @@ public abstract class AbstractJobQueue
                                                 }
                                             }
                                         });
-                                        if ( result.getState() == JobStatus.JobState.ASYNC ) {
+                                        if ( result == null ) { // ASYNC processing
                                             asyncCounter.incrementAndGet();
                                             notifyFinished(null);
                                             isAsync.set(true);
@@ -591,7 +589,7 @@ public abstract class AbstractJobQueue
                                 } finally {
                                     currentThread.setPriority(oldPriority);
                                     currentThread.setName(oldName);
-                                    if ( result.getState() != JobStatus.JobState.ASYNC ) {
+                                    if ( result != null ) {
                                         finishedJob(job.getId(), result, false);
                                     }
                                 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1524130&r1=1524129&r2=1524130&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java Tue Sep 17 17:10:39 2013
@@ -23,6 +23,8 @@ import java.util.Set;
 
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 
+import aQute.bnd.annotation.ProviderType;
+
 /**
  * A job
  *
@@ -43,6 +45,7 @@ import org.apache.sling.event.jobs.consu
  *
  * @since 1.2
  */
+@ProviderType
 public interface Job {
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1524130&r1=1524129&r2=1524130&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java Tue Sep 17 17:10:39 2013
@@ -81,7 +81,7 @@ public interface JobManager {
      * allowed characters are letters, numbers and the underscore.
      *
      * @param topic The required job topic.
-     * @param properties Optional job properties
+     * @param properties Optional job properties. The properties must be serializable.
      * @return The new job - or <code>null</code> if the job could not be created.
      * @since 1.2
      */
@@ -102,7 +102,7 @@ public interface JobManager {
      *
      * @param topic The required job topic.
      * @param name  Optional unique job name
-     * @param properties Optional job properties
+     * @param properties Optional job properties. The properties must be serializable.
      * @return The new job - or <code>null</code> if the job could not be created.
      * @since 1.2
      */

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java?rev=1524130&r1=1524129&r2=1524130&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobConsumer.java Tue Sep 17 17:10:39 2013
@@ -19,6 +19,7 @@
 package org.apache.sling.event.jobs.consumer;
 
 import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
 
 import aQute.bnd.annotation.ConsumerType;
 
@@ -27,6 +28,9 @@ import aQute.bnd.annotation.ConsumerType
 /**
  * A job consumer consumes a job.
  *
+ * If the job consumer needs more features like providing progress information or adding
+ * more information of the processing, {@link JobExecutor} should be implemented instead.
+ *
  * A job consumer registers itself with the {@link #PROPERTY_TOPICS} service registration
  * property. The value of this property defines which topics a consumer is able to process.
  * Each string value of this property is either a job topic or a topic category ending
@@ -35,7 +39,7 @@ import aQute.bnd.annotation.ConsumerType
  * "org/apache/sling/jobs/a" and "org/apache/sling/jobs/b" but neither
  * "org/apache/sling/jobs" nor "org/apache/sling/jobs/subcategory/a"
  *
- * If there is more than one job consumer registered for a job topic, the selection is as
+ * If there is more than one job consumer or executor registered for a job topic, the selection is as
  * follows:
  * - If there is a single consumer registering for the exact topic, this one is used
  * - If there is more than a single consumer registering for the exact topic, the one

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java?rev=1524130&r1=1524129&r2=1524130&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutor.java Tue Sep 17 17:10:39 2013
@@ -22,9 +22,32 @@ import org.apache.sling.event.jobs.Job;
 
 import aQute.bnd.annotation.ConsumerType;
 
-
-
 /**
+ * A job executor consumes a job.
+ *
+ * A job executor registers itself with the {@link #PROPERTY_TOPICS} service registration
+ * property. The value of this property defines which topics an executor is able to process.
+ * Each string value of this property is either a job topic or a topic category ending
+ * with "/*" which means all topics in this category.
+ * For example, the value "org/apache/sling/jobs/*" matches the topics
+ * "org/apache/sling/jobs/a" and "org/apache/sling/jobs/b" but neither
+ * "org/apache/sling/jobs" nor "org/apache/sling/jobs/subcategory/a"
+ *
+ * If there is more than one job executor or consumer registered for a job topic,
+ * the selection is as follows:
+ * - If there is a single consumer registering for the exact topic, this one is used
+ * - If there is more than a single consumer registering for the exact topic, the one
+ *   with the highest service ranking is used. If the ranking is equal, the one with
+ *   the lowest service ID is used.
+ * - If there is a single consumer registered for the category, it is used
+ * - If there is more than a single consumer registered for the category, the service
+ *   with the highest service ranking is used. If the ranking is equal, the one with
+ *   the lowest service ID is used.
+ *
+ * If the executor decides to process the job asynchronously, the processing must finish
+ * within the current lifetime of the job executor. If the executor (or the instance
+ * of the executor) dies, the job processing will mark this processing as failed and
+ * reschedule.
  *
  * @since 1.1
  */
@@ -32,11 +55,33 @@ import aQute.bnd.annotation.ConsumerType
 public interface JobExecutor {
 
     /**
-     * Service registration property defining the jobs this consumer is able to process.
+     * Service registration property defining the jobs this executor is able to process.
      * The value is either a string or an array of strings.
      */
     String PROPERTY_TOPICS = "job.topics";
 
-
+    /**
+     * Execute the job.
+     *
+     * If the job has been processed successfully, {@link JobStatus.OK} should be returned.
+     * If the job has not been processed completely, but might be rescheduled {@link JobStatus.FAILED}
+     * should be returned.
+     * If the job processing failed and should not be rescheduled, {@link JobStatus.CANCEL} should
+     * be returned.
+     *
+     * If the executor decides to process the job asynchronously it should return <code>null</code>
+     * and notify the job manager by using the {@link JobExecutionContext#asyncProcessingFinished(JobStatus)}
+     * method.
+     *
+     * If the processing fails with throwing an exception/throwable, the process will not be rescheduled
+     * and treated like the method would have returned {@link JobStatus.CANCEL}.
+     *
+     * Instead of the constants from the JobStatus class, this method can return a custom instance containing
+     * additional information.
+     *
+     * @param job The job
+     * @param context The execution context.
+     * @return The job status
+     */
     JobStatus process(Job job, JobExecutionContext context);
 }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java?rev=1524130&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java Tue Sep 17 17:10:39 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs.consumer;
+
+/**
+ * The state of the job after it has been processed by a {@link JobExecutor}.
+ * @since 1.1
+ */
+public enum JobState {
+
+    OK,      // processing finished successfully
+    FAILED,  // processing failed, can be retried
+    CANCEL   // processing failed permanently
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobState.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java?rev=1524130&r1=1524129&r2=1524130&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java Tue Sep 17 17:10:39 2013
@@ -18,58 +18,79 @@
  */
 package org.apache.sling.event.jobs.consumer;
 
-import aQute.bnd.annotation.ProviderType;
-
-
-
 /**
+ * The status of a job after it has been processed by a {@link JobExecutor}.
  *
  * @since 1.1
  */
-@ProviderType
 public final class JobStatus {
 
+    /** Constant for the {@link JobState#OK} status. */
     public static final JobStatus OK = new JobStatus(JobState.OK, null);
 
+    /** Constant for the {@link JobState#FAILED} status. */
     public static final JobStatus FAILED = new JobStatus(JobState.FAILED, null);
 
+    /** Constant for the {@link JobState#CANCEL} status. */
     public static final JobStatus CANCEL = new JobStatus(JobState.CANCEL, null);
 
-    public static final JobStatus ASYNC = new JobStatus(JobState.ASYNC, null);
-
-    public enum JobState {
-        OK,      // processing finished
-        FAILED,  // processing failed, can be retried
-        CANCEL,  // processing failed permanently
-        ASYNC    // processing will be done async
-    }
-
+    /** The state of the job after processing. */
     private final JobState state;
 
+    /** Optional message for this processing of the job. */
     private final String message;
 
+    /** Optional override for the retry delay. */
     private final Long retryDelay;
 
-    public JobStatus(final JobState result, final String message) {
-        this(result, message, null);
+    /**
+     * Create a new job status with a state and a message.
+     * @param state   The job state
+     * @param message The message
+     * @throws IllegalArgumentException If state is null
+     */
+    public JobStatus(final JobState state, final String message) {
+        this(state, message, null);
     }
 
-    public JobStatus(final JobState result, final String message, final Long retryDelayInMs) {
-        this.state = result;
+    /**
+     * Create a new job status with a state, a message and an override for the retry delay
+     * @param state   The job state
+     * @param message The message
+     * @param retryDelayInMs The new retry delay in ms.
+     * @throws IllegalArgumentException If state is null or if retryDelayInMs is negative.
+     */
+    public JobStatus(final JobState state, final String message, final Long retryDelayInMs) {
+        if ( state == null ) {
+            throw new IllegalArgumentException("State must not be null.");
+        }
+        if ( retryDelayInMs != null && retryDelayInMs < 0 ) {
+            throw new IllegalArgumentException("Retry delay must not be negative.");
+        }
+        this.state = state;
         this.message = message;
         this.retryDelay = retryDelayInMs;
     }
 
+    /**
+     * Return the state of the job
+     * @return The job state.
+     */
     public JobState getState() {
         return this.state;
     }
 
+    /**
+     * Return the optional message.
+     * @return The message or <code>null</code>
+     */
     public String getMessage() {
         return this.message;
     }
 
     /**
      * Return the retry delay in ms
+     * @return The new retry delay (>= 0) or <code>null</code>
      */
     public Long getRetryDelayInMs() {
         return this.retryDelay;