You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 11:25:58 UTC

svn commit: r1632253 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/jobs/ src/main/java/org/apache/sling/event/impl/jobs/config/ src/main/java/org/apache/sling/event/i...

Author: cziegeler
Date: Thu Oct 16 09:25:57 2014
New Revision: 1632253

URL: http://svn.apache.org/r1632253
Log:
SLING-4068 : Drop support PROPERTY_NOTIFICATION_JOB event property
SLING-4065 : Add notification when a job is added
SLING-4063 : Fix job queue statistics

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java   (with props)
Removed:
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Thu Oct 16 09:25:57 2014
@@ -116,7 +116,7 @@
                         -Xmx2048m -XX:MaxPermSize=512m
                     </argLine>
                     <includes>
-                        <include>**/it/OrderedQueueTest*</include>
+                        <include>**/it/Ordered*</include>
                     </includes>
                 </configuration>
             </plugin>

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Thu Oct 16 09:25:57 2014
@@ -38,8 +38,11 @@ import org.apache.sling.commons.threads.
 /**
  * The configurable eventing thread pool.
  */
-@Component(label="%event.pool.name",
-        description="%event.pool.description",
+@Component(label="Apache Sling Job Thread Pool",
+        description="This is the thread pool used by the Apache Sling job handling. The "
+                  + "threads from this pool are merely used for executing jobs. By limiting this pool, it is "
+                  + "possible to limit the maximum number of parallel processed jobs - regardless of the queue "
+                  + "configuration.",
         metatype=true)
 @Service(value=EventingThreadPool.class)
 public class EventingThreadPool implements ThreadPool {
@@ -52,7 +55,10 @@ public class EventingThreadPool implemen
 
     private static final int DEFAULT_POOL_SIZE = 35;
 
-    @Property(intValue=DEFAULT_POOL_SIZE)
+    @Property(intValue=DEFAULT_POOL_SIZE,
+              label="Pool Size",
+              description="The size of the thread pool. This pool is used to execute jobs and therefore "
+                        + "limits the maximum number of jobs executed in parallel.")
     private static final String PROPERTY_POOL_SIZE = "minPoolSize";
 
     public EventingThreadPool() {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Thu Oct 16 09:25:57 2014
@@ -58,8 +58,10 @@ import org.slf4j.LoggerFactory;
 /**
  * This component manages/keeps track of all job consumer services.
  */
-@Component(label="%job.consumermanager.name",
-           description="%job.consumermanager.description",
+@Component(label="Apache Sling Job Consumer Manager",
+           description="The consumer manager controls the job consumer (= processors). "
+                     + "It can be used to temporarily disable job processing on the current instance. Other instances "
+                     + "in a cluster are not affected.",
            metatype=true)
 @Service(value=JobConsumerManager.class)
 @References({
@@ -76,10 +78,20 @@ import org.slf4j.LoggerFactory;
                     + "only used on the current instance. This option should always be disabled!")
 public class JobConsumerManager {
 
-    @Property(unbounded=PropertyUnbounded.ARRAY, value = "*")
+    @Property(unbounded=PropertyUnbounded.ARRAY, value = "*",
+              label="Topic Whitelist",
+              description="This is a list of topics which currently should be "
+                        + "processed by this instance. Leaving it empty, all job consumers are disabled. Putting a '*' as "
+                        + "one entry, enables all job consumers. Adding separate topics enables job consumers for exactly "
+                        + "this topic.")
     private static final String PROPERTY_WHITELIST = "job.consumermanager.whitelist";
 
-    @Property(unbounded=PropertyUnbounded.ARRAY)
+    @Property(unbounded=PropertyUnbounded.ARRAY,
+              label="Topic Blacklist",
+              description="This is a list of topics which currently shouldn't be "
+                        + "processed by this instance. Leaving it empty, all job consumers are enabled. Putting a '*' as "
+                        + "one entry, disables all job consumers. Adding separate topics disables job consumers for exactly "
+                        + "this topic.")
     private static final String PROPERTY_BLACKLIST = "job.consumermanager.blacklist";
 
     /** The map with the consumers, keyed by topic, sorted by service ranking. */

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 09:25:57 2014
@@ -49,6 +49,7 @@ import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
 import org.apache.sling.event.impl.jobs.queues.QueueManager;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
@@ -61,6 +62,7 @@ import org.apache.sling.event.impl.suppo
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobBuilder;
 import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.JobsIterator;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.Queue;
@@ -399,7 +401,7 @@ public class JobManagerImpl
                         } else {
                             logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
                         }
-                        Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_REMOVED, job, null);
+                        NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_REMOVED, job, null);
                     } catch ( final PersistenceException pe) {
                         this.ignoreException(pe);
                         result = false;
@@ -452,7 +454,7 @@ public class JobManagerImpl
             buf.append("//element(*,");
             buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
             buf.append(")[@");
-            buf.append(ISO9075.encode(ResourceHelper.PROPERTY_JOB_NAME));
+            buf.append(ISO9075.encode(JobUtil.PROPERTY_JOB_NAME));
             buf.append(" = '");
             buf.append(name);
             buf.append("']");
@@ -895,7 +897,7 @@ public class JobManagerImpl
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Dropping job due to configuration of queue {} : {}", info.queueName, Utility.toString(jobTopic, jobName, jobProperties));
             }
-            Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, jobTopic, jobName, jobProperties, null);
+            NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, jobTopic, jobName, jobProperties, null);
         } else {
             // check for unique jobs
             if ( jobName != null && !this.lock(jobTopic, jobName) ) {
@@ -966,7 +968,7 @@ public class JobManagerImpl
         properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
         properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
         if ( jobName != null ) {
-            properties.put(ResourceHelper.PROPERTY_JOB_NAME, jobName);
+            properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
         }
         properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueConfiguration.getName());
         properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Thu Oct 16 09:25:57 2014
@@ -53,6 +53,7 @@ import org.apache.sling.event.impl.suppo
 import org.apache.sling.event.impl.support.ScheduleInfoImpl;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobBuilder;
+import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.ScheduleInfo;
 import org.apache.sling.event.jobs.ScheduleInfo.ScheduleType;
 import org.apache.sling.event.jobs.ScheduledJobInfo;
@@ -534,7 +535,7 @@ public class JobSchedulerImpl
 
             properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
             if ( jobName != null ) {
-                properties.put(ResourceHelper.PROPERTY_JOB_NAME, jobName);
+                properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
             }
             properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
             properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Thu Oct 16 09:25:57 2014
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -36,11 +35,8 @@ import org.apache.sling.api.resource.Val
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobUtil;
-import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
-import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 
 public abstract class Utility {
@@ -101,76 +97,6 @@ public abstract class Utility {
         return msg;
     }
 
-    /** Event property containing the time for job start and job finished events. */
-    public static final String PROPERTY_TIME = "time";
-
-    /**
-     * Helper method for sending the notification events.
-     */
-    public static void sendNotification(final EventAdmin eventAdmin,
-            final String eventTopic,
-            final String jobTopic,
-            final String jobName,
-            final Map<String, Object> jobProperties,
-            final Long time) {
-        if ( eventAdmin != null ) {
-            // create job object
-            final Map<String, Object> jobProps;
-            if ( jobProperties == null ) {
-                jobProps = new HashMap<String, Object>();
-            } else {
-                jobProps = jobProperties;
-            }
-            final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
-            sendNotificationInternal(eventAdmin, eventTopic, job, time);
-        }
-    }
-
-    /**
-     * Helper method for sending the notification events.
-     */
-    public static void sendNotification(final EventAdmin eventAdmin,
-            final String eventTopic,
-            final Job job,
-            final Long time) {
-        if ( eventAdmin != null ) {
-            // create new copy of job object
-            final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
-            sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
-        }
-    }
-
-    /**
-     * Helper method for sending the notification events.
-     */
-    private static void sendNotificationInternal(final EventAdmin eventAdmin,
-            final String eventTopic,
-            final Job job,
-            final Long time) {
-        final Dictionary<String, Object> eventProps = new Hashtable<String, Object>();
-        // add basic job properties
-        eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
-        eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
-        if ( job.getName() != null ) {
-            eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
-        }
-        // copy payload
-        for(final String name : job.getPropertyNames()) {
-            eventProps.put(name, job.getProperty(name));
-        }
-        // remove async handler
-        eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
-        // add timestamp
-        eventProps.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
-        // add internal time information
-        if ( time != null ) {
-            eventProps.put(PROPERTY_TIME, time);
-        }
-        // compatibility:
-        eventProps.put(JobUtil.PROPERTY_NOTIFICATION_JOB, toEvent(job));
-        eventAdmin.postEvent(new Event(eventTopic, eventProps));
-    }
-
     /**
      * Create an event from a job
      * @param job The job
@@ -180,7 +106,7 @@ public abstract class Utility {
         final Map<String, Object> eventProps = new HashMap<String, Object>();
         eventProps.putAll(((JobImpl)job).getProperties());
         if ( job.getName() != null ) {
-            eventProps.put(ResourceHelper.PROPERTY_JOB_NAME, job.getName());
+            eventProps.put(JobUtil.PROPERTY_JOB_NAME, job.getName());
         }
         eventProps.put(ResourceHelper.PROPERTY_JOB_ID, job.getId());
         eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
@@ -197,7 +123,7 @@ public abstract class Utility {
             boolean first = true;
             for(final String propName : properties.keySet()) {
                 if ( propName.equals(ResourceHelper.PROPERTY_JOB_ID)
-                    || propName.equals(ResourceHelper.PROPERTY_JOB_NAME)
+                    || propName.equals(JobUtil.PROPERTY_JOB_NAME)
                     || propName.equals(ResourceHelper.PROPERTY_JOB_TOPIC) ) {
                    continue;
                 }
@@ -297,7 +223,7 @@ public abstract class Utility {
                         }
                     }
                     job = new JobImpl(topic,
-                            (String)jobProperties.get(ResourceHelper.PROPERTY_JOB_NAME),
+                            (String)jobProperties.get(JobUtil.PROPERTY_JOB_NAME),
                             jobId,
                             jobProperties);
                 } else {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Thu Oct 16 09:25:57 2014
@@ -36,38 +36,80 @@ import org.apache.sling.event.jobs.JobUt
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.osgi.framework.Constants;
 
-@Component(metatype=true,name="org.apache.sling.event.jobs.QueueConfiguration",
-        label="%queue.name", description="%queue.description",
-        configurationFactory=true,policy=ConfigurationPolicy.REQUIRE)
+@Component(metatype=true,
+           name="org.apache.sling.event.jobs.QueueConfiguration",
+           label="Apache Sling Job Queue Configuration",
+           description="The configuration of a job processing queue.",
+           configurationFactory=true, policy=ConfigurationPolicy.REQUIRE)
 @Service(value={InternalQueueConfiguration.class})
 @Properties({
-    @Property(name=ConfigurationConstants.PROP_NAME),
-    @Property(name=ConfigurationConstants.PROP_TYPE,
-            value=ConfigurationConstants.DEFAULT_TYPE,
-            options={@PropertyOption(name="UNORDERED",value="Parallel"),
-                     @PropertyOption(name="ORDERED",value="Ordered"),
-                     @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin")}),
+    @Property(name=ConfigurationConstants.PROP_NAME,
+              label="Name",
+              description="The name of the queue. If matching is used the token {0} can be used to substitute the real value."),
     @Property(name=ConfigurationConstants.PROP_TOPICS,
-            unbounded=PropertyUnbounded.ARRAY),
+              unbounded=PropertyUnbounded.ARRAY,
+              label="Topics",
+              description="This value is required and lists the topics processed by "
+                        + "this queue. The value is a list of strings. If a string ends with a dot, "
+                        + "all topics in exactly this package match. If the string ends with a star, "
+                        + "all topics in this package and all subpackages match. If the string neither "
+                        + "ends with a dot nor with a star, this is assumed to define an exact topic."),
+    @Property(name=ConfigurationConstants.PROP_TYPE,
+              value=ConfigurationConstants.DEFAULT_TYPE,
+              options={@PropertyOption(name="UNORDERED",value="Parallel"),
+                       @PropertyOption(name="ORDERED",value="Ordered"),
+                       @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin")},
+              label="Type",
+              description="The queue type."),
     @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
-            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
+              intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
+              label="Maximum Parallel Jobs",
+              description="The maximum number of parallel jobs started for this queue. "
+                        + "A value of -1 is substituted with the number of available processors."),
     @Property(name=ConfigurationConstants.PROP_RETRIES,
-            intValue=ConfigurationConstants.DEFAULT_RETRIES),
+              intValue=ConfigurationConstants.DEFAULT_RETRIES,
+              label="Maximum Retries",
+              description="The maximum number of times a failed job slated "
+                        + "for retries is actually retried. If a job has been retried this number of "
+                        + "times and still fails, it is not rescheduled and assumed to have failed. The "
+                        + "default value is 10."),
     @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
-            longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+              longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY,
+              label="Retry Delay",
+              description="The number of milliseconds to sleep between two "
+                        + "consecutive retries of a job which failed and was set to be retried. The "
+                        + "default value is 2 seconds. This value is only relevant if there is a single "
+                        + "failed job in the queue. If there are multiple failed jobs, each job is "
+                        + "retried in turn without an intervening delay."),
     @Property(name=ConfigurationConstants.PROP_PRIORITY,
-            value=ConfigurationConstants.DEFAULT_PRIORITY,
-            options={@PropertyOption(name="NORM",value="Norm"),
-                     @PropertyOption(name="MIN",value="Min"),
-                     @PropertyOption(name="MAX",value="Max")}),
+              value=ConfigurationConstants.DEFAULT_PRIORITY,
+              options={@PropertyOption(name="NORM",value="Norm"),
+                       @PropertyOption(name="MIN",value="Min"),
+                       @PropertyOption(name="MAX",value="Max")},
+              label="Priority",
+              description="The priority for the threads used by this queue. Default is norm."),
     @Property(name=ConfigurationConstants.PROP_KEEP_JOBS,
-              boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS),
+              boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS,
+              label="Keep History",
+              description="If this option is enabled, successful finished jobs are kept "
+                        + "to provide a complete history."),
     @Property(name=ConfigurationConstants.PROP_PREFER_RUN_ON_CREATION_INSTANCE,
-              boolValue=ConfigurationConstants.DEFAULT_PREFER_RUN_ON_CREATION_INSTANCE),
+              boolValue=ConfigurationConstants.DEFAULT_PREFER_RUN_ON_CREATION_INSTANCE,
+              label="Prefer Creation Instance",
+              description="If this option is enabled, the jobs are tried to "
+                        + "be run on the instance where the job was created."),
     @Property(name=ConfigurationConstants.PROP_THREAD_POOL_SIZE,
-              intValue=ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE),
-    @Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
-              label="%queue.ranking.name", description="%queue.ranking.description")
+              intValue=ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE,
+              label="Thread Pool Size",
+              description="Optional configuration value for a thread pool to be used by "
+                        + "this queue. If this is value has a positive number of threads configuration, this queue uses "
+                        + "an own thread pool with the configured number of threads."),
+    @Property(name=Constants.SERVICE_RANKING,
+              intValue=0,
+              propertyPrivate=false,
+              label="Ranking",
+              description="Integer value defining the ranking of this queue configuration. "
+                        + "If more than one queue matches a job topic, the one with the highest ranking is used.")
 })
 public class InternalQueueConfiguration
     implements QueueConfiguration, Comparable<InternalQueueConfiguration> {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java Thu Oct 16 09:25:57 2014
@@ -37,23 +37,39 @@ import org.slf4j.LoggerFactory;
  * This is the configuration for the main queue.
  *
  */
-@Component(label="%job.events.name",
-        description="%job.events.description",
-        name="org.apache.sling.event.impl.jobs.DefaultJobManager",
-        metatype=true)
+@Component(label="Apache Sling Job Default Queue",
+           description="The configuration of the default job queue.",
+           name="org.apache.sling.event.impl.jobs.DefaultJobManager",
+           metatype=true)
 @Service(value=MainQueueConfiguration.class)
 @Properties({
     @Property(name=ConfigurationConstants.PROP_PRIORITY,
-            value=ConfigurationConstants.DEFAULT_PRIORITY,
-            options={@PropertyOption(name="NORM",value="Norm"),
-            @PropertyOption(name="MIN",value="Min"),
-            @PropertyOption(name="MAX",value="Max")}),
+              value=ConfigurationConstants.DEFAULT_PRIORITY,
+              options={@PropertyOption(name="NORM",value="Norm"),
+                       @PropertyOption(name="MIN",value="Min"),
+                       @PropertyOption(name="MAX",value="Max")},
+              label="Priority",
+              description="The priority for the threads used by this queue. Default is norm."),
     @Property(name=ConfigurationConstants.PROP_RETRIES,
-            intValue=ConfigurationConstants.DEFAULT_RETRIES),
+            intValue=ConfigurationConstants.DEFAULT_RETRIES,
+            label="Maximum Retries",
+            description="The maximum number of times a failed job slated "
+                      + "for retries is actually retried. If a job has been retried this number of "
+                      + "times and still fails, it is not rescheduled and assumed to have failed. The "
+                      + "default value is 10."),
     @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
-            longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+            longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY,
+            label="Retry Delay",
+            description="The number of milliseconds to sleep between two "
+                      + "consecutive retries of a job which failed and was set to be retried. The "
+                      + "default value is 2 seconds. This value is only relevant if there is a single "
+                      + "failed job in the queue. If there are multiple failed jobs, each job is "
+                      + "retried in turn without an intervening delay."),
     @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
-            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL)
+            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
+            label="Maximum Parallel Jobs",
+            description="The maximum number of parallel jobs started for this queue. "
+                      + "A value of -1 is substituted with the number of available processors."),
 })
 public class MainQueueConfiguration {
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java Thu Oct 16 09:25:57 2014
@@ -160,7 +160,7 @@ public class EventAdminBridge
                     }
                 } else {
                     final String jobTopic = (String)event.getProperty(ResourceHelper.PROPERTY_JOB_TOPIC);
-                    final String jobName = (String)event.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+                    final String jobName = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
 
                     final Map<String, Object> props =  new EventPropertiesMap(event);
                     props.put(JobImpl.PROPERTY_BRIDGED_EVENT, Boolean.TRUE);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java Thu Oct 16 09:25:57 2014
@@ -32,6 +32,7 @@ import org.apache.sling.event.JobsIterat
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.JobManager.QueryType;
+import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.Queue;
 import org.osgi.service.event.Event;
 
@@ -57,7 +58,7 @@ public class JobStatusProviderImpl
     public boolean removeJob(final String topic, final String jobId) {
         if ( jobId != null && topic != null ) {
             final Event job = this.jobManager.findJob(topic,
-                    Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)jobId));
+                    Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
             if ( job != null ) {
                 return this.removeJob((String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID));
             }
@@ -81,7 +82,7 @@ public class JobStatusProviderImpl
     public void forceRemoveJob(final String topic, final String jobId) {
         if ( jobId != null && topic != null ) {
             final Event job = this.jobManager.findJob(topic,
-                    Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)jobId));
+                    Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
             if ( job != null ) {
                 this.forceRemoveJob((String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID));
             }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java?rev=1632253&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java Thu Oct 16 09:25:57 2014
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.notifications;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This component receives resource added events and sends a job
+ * created event.
+ */
+@Component
+public class NewJobSender implements EventHandler {
+
+    /** Logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** The job manager configuration. */
+    @Reference
+    private JobManagerConfiguration configuration;
+
+    /** The event admin. */
+    @Reference
+    private EventAdmin eventAdmin;
+
+    /** Service registration for the event handler. */
+    private volatile ServiceRegistration eventHandlerRegistration;
+
+    /**
+     * Activate this component.
+     * Register an event handler.
+     */
+    @Activate
+    protected void activate(final BundleContext bundleContext) {
+        final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+        properties.put(Constants.SERVICE_DESCRIPTION, "Apache Sling Job Topic Manager Event Handler");
+        properties.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+        properties.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED);
+        properties.put(EventConstants.EVENT_FILTER,
+                "(" + SlingConstants.PROPERTY_PATH + "=" +
+                      this.configuration.getLocalJobsPath() + "/*)");
+        this.eventHandlerRegistration = bundleContext.registerService(EventHandler.class.getName(), this, properties);
+    }
+
+    /**
+     * Deactivate this component.
+     * Unregister the event handler.
+     */
+    @Deactivate
+    protected void deactivate() {
+        if ( this.eventHandlerRegistration != null ) {
+            this.eventHandlerRegistration.unregister();
+            this.eventHandlerRegistration = null;
+        }
+    }
+
+    @Override
+    public void handleEvent(final Event event) {
+        logger.debug("Received event {}", event);
+        final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
+        final String rt = (String) event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
+        if ( ResourceHelper.RESOURCE_TYPE_JOB.equals(rt) && this.configuration.isLocalJob(path) ) {
+            // read the job
+            final ResourceResolver resolver = this.configuration.createResourceResolver();
+            try {
+                final Resource rsrc = resolver.getResource(path);
+                if ( rsrc != null ) {
+                    final Job job = Utility.readJob(this.logger, rsrc);
+                    if ( job != null ) {
+                        logger.debug("Sending job added event for {}", job);
+                        NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_ADDED, job, null);
+                    }
+                }
+            } finally {
+                resolver.close();
+            }
+        }
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java?rev=1632253&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java Thu Oct 16 09:25:57 2014
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.notifications;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+
+public abstract class NotificationUtility {
+
+    /** Event property containing the time for job start and job finished events. */
+    public static final String PROPERTY_TIME = ":time";
+
+    /**
+     * Helper method for sending the notification events.
+     */
+    public static void sendNotification(final EventAdmin eventAdmin,
+            final String eventTopic,
+            final String jobTopic,
+            final String jobName,
+            final Map<String, Object> jobProperties,
+            final Long time) {
+        if ( eventAdmin != null ) {
+            // create job object
+            final Map<String, Object> jobProps;
+            if ( jobProperties == null ) {
+                jobProps = new HashMap<String, Object>();
+            } else {
+                jobProps = jobProperties;
+            }
+            final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
+            sendNotificationInternal(eventAdmin, eventTopic, job, time);
+        }
+    }
+
+    /**
+     * Helper method for sending the notification events.
+     */
+    public static void sendNotification(final EventAdmin eventAdmin,
+            final String eventTopic,
+            final Job job,
+            final Long time) {
+        if ( eventAdmin != null ) {
+            // create new copy of job object
+            final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
+            sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
+        }
+    }
+
+    /**
+     * Helper method for sending the notification events.
+     */
+    private static void sendNotificationInternal(final EventAdmin eventAdmin,
+            final String eventTopic,
+            final Job job,
+            final Long time) {
+        final Dictionary<String, Object> eventProps = new Hashtable<String, Object>();
+        // add basic job properties
+        eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
+        eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
+        if ( job.getName() != null ) {
+            eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
+        }
+        // copy payload
+        for(final String name : job.getPropertyNames()) {
+            eventProps.put(name, job.getProperty(name));
+        }
+        // remove async handler
+        eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
+        // add timestamp
+        eventProps.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
+        // add internal time information
+        if ( time != null ) {
+            eventProps.put(PROPERTY_TIME, time);
+        }
+        if ( NotificationConstants.TOPIC_JOB_ADDED.equals(eventTopic) ) {
+            eventAdmin.sendEvent(new Event(eventTopic, eventProps));
+        } else {
+            eventAdmin.postEvent(new Event(eventTopic, eventProps));
+        }
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 09:25:57 2014
@@ -39,6 +39,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
@@ -370,7 +371,7 @@ public abstract class AbstractJobQueue
 
                 if ( consumer != null ) {
                     final long queueTime = handler.started - handler.queued;
-                    Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
+                    NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
                     synchronized ( this.processingJobsLists ) {
                         this.processingJobsLists.put(job.getId(), handler);
                     }
@@ -592,7 +593,7 @@ public abstract class AbstractJobQueue
                     this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
                 }
                 info.processingTime = System.currentTimeMillis() - handler.started;
-                Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
+                NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
                 break;
             case QUEUED : // check if we exceeded the number of retries
                 final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
@@ -603,7 +604,7 @@ public abstract class AbstractJobQueue
                     if ( this.logger.isDebugEnabled() ) {
                         this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
                     }
-                    Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+                    NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
                 } else {
                     info.reschedule = true;
                     this.reschedule(handler);
@@ -611,14 +612,14 @@ public abstract class AbstractJobQueue
                         this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
                     }
                     handler.queued = System.currentTimeMillis();
-                    Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
+                    NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
                 }
                 break;
             default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
                 if ( this.logger.isDebugEnabled() ) {
                     this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
                 }
-                Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+                NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
                 break;
         }
 
@@ -726,7 +727,7 @@ public abstract class AbstractJobQueue
                 logger.debug("Received ack for job {}", Utility.toString(ack.getJob()));
             }
             final long queueTime = ack.started - ack.queued;
-            Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
+            NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
             synchronized ( this.processingJobsLists ) {
                 this.processingJobsLists.put(jobId, ack);
             }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Thu Oct 16 09:25:57 2014
@@ -27,13 +27,10 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.SlingConstants;
-import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.TestLogger;
-import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.Statistics;
@@ -48,14 +45,13 @@ import org.slf4j.LoggerFactory;
 @Service(value={EventHandler.class, StatisticsManager.class})
 @Properties({
     @Property(name=EventConstants.EVENT_TOPIC,
-          value={SlingConstants.TOPIC_RESOURCE_ADDED,
+          value={NotificationConstants.TOPIC_JOB_ADDED,
                  NotificationConstants.TOPIC_JOB_STARTED,
                  NotificationConstants.TOPIC_JOB_CANCELLED,
                  NotificationConstants.TOPIC_JOB_FAILED,
                  NotificationConstants.TOPIC_JOB_FINISHED,
                  NotificationConstants.TOPIC_JOB_REMOVED})
 })
-// TODO register event handlers on activate to allow for filters!
 public class StatisticsManager implements EventHandler {
 
     /** Logger. */
@@ -106,70 +102,57 @@ public class StatisticsManager implement
 
     @Override
     public void handleEvent(final Event event) {
-        if ( SlingConstants.TOPIC_RESOURCE_ADDED.equals(event.getTopic()) ) {
-            final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
-            if ( this.configuration.isLocalJob(path) ) {
-                final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
-                final int topicEnd = path.indexOf("/", topicStart);
-                final String topic;
-                if ( topicEnd == -1 ) {
-                    topic = path.substring(topicStart).replace('.', '/');
-                } else {
-                    topic = path.substring(topicStart, topicEnd).replace('.', '/');
-                }
+        final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+        if ( topic != null ) { // this is just a sanity check
+            final String queueName = (String)event.getProperty(Job.PROPERTY_JOB_QUEUE_NAME);
+            final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
+
+            TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+            if ( ts == null ) {
+                this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
+                ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+            }
+
+            if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_ADDED) ) {
                 this.baseStatistics.incQueued();
-                final QueueInfo info = this.queueConfigurationManager.getQueueInfo(topic);
-                final StatisticsImpl queueStats = getStatisticsForQueue(info.queueName);
                 queueStats.incQueued();
-            }
-        } else {
-            if ( EventUtil.isLocal(event) ) {
-                // job notifications
-                final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
-                if ( topic != null ) { // this is just a sanity check
-                    final String queueName = (String)event.getProperty(Job.PROPERTY_JOB_QUEUE_NAME);
-                    final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
-
-                    TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
-                    if ( ts == null ) {
-                        this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
-                        ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
-                    }
-
-                    if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
-                        ts.addCancelled();
-                        this.baseStatistics.cancelledJob();
-                        if ( queueStats != null ) {
-                            queueStats.cancelledJob();
-                        }
-                    } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
-                        ts.addFailed();
-                        this.baseStatistics.failedJob();
-                        if ( queueStats != null ) {
-                            queueStats.failedJob();
-                        }
-                    } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
-                        final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
-                        ts.addFinished(time == null ? -1 : time);
-                        this.baseStatistics.finishedJob(time == null ? -1 : time);
-                        if ( queueStats != null ) {
-                            queueStats.finishedJob(time == null ? -1 : time);
-                        }
-                    } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
-                        final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
-                        ts.addActivated(time == null ? -1 : time);
-                        this.baseStatistics.addActive(time == null ? -1 : time);
-                        if ( queueStats != null ) {
-                            queueStats.addActive(time == null ? -1 : time);
-                        }
-                    } else if ( NotificationConstants.TOPIC_JOB_REMOVED.equals(event.getTopic()) ) {
-                        this.baseStatistics.decQueued();
-                        this.baseStatistics.cancelledJob();
-                        if ( queueStats != null ) {
-                            queueStats.decQueued();
-                            queueStats.cancelledJob();
-                        }
-                    }
+
+            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
+                ts.addCancelled();
+                this.baseStatistics.cancelledJob();
+                if ( queueStats != null ) {
+                    queueStats.cancelledJob();
+                }
+
+            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
+                ts.addFailed();
+                this.baseStatistics.failedJob();
+                if ( queueStats != null ) {
+                    queueStats.failedJob();
+                }
+
+            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
+                final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
+                ts.addFinished(time == null ? -1 : time);
+                this.baseStatistics.finishedJob(time == null ? -1 : time);
+                if ( queueStats != null ) {
+                    queueStats.finishedJob(time == null ? -1 : time);
+                }
+
+            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
+                final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
+                ts.addActivated(time == null ? -1 : time);
+                this.baseStatistics.addActive(time == null ? -1 : time);
+                if ( queueStats != null ) {
+                    queueStats.addActive(time == null ? -1 : time);
+                }
+
+            } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
+                this.baseStatistics.decQueued();
+                this.baseStatistics.cancelledJob();
+                if ( queueStats != null ) {
+                    queueStats.decQueued();
+                    queueStats.cancelledJob();
                 }
             }
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java Thu Oct 16 09:25:57 2014
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobUtil;
 import org.osgi.service.event.Event;
 
 final class ScheduleInfo implements Serializable {
@@ -66,7 +67,7 @@ final class ScheduleInfo implements Seri
         }
 
         final String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
-        final String jId = (String)event.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+        final String jId = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
 
         this.jobId = getJobId(topic, id, jId);
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Thu Oct 16 09:25:57 2014
@@ -65,8 +65,6 @@ public abstract class ResourceHelper {
     public static final String PROPERTY_SCHEDULE_SUSPENDED = "slingevent:scheduleSuspended";
 
     public static final String PROPERTY_JOB_ID = "slingevent:eventId";
-    @Deprecated
-    public static final String PROPERTY_JOB_NAME = "event.job.id";
     public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
 
     /** List of ignored properties to write to the repository. */

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java Thu Oct 16 09:25:57 2014
@@ -208,6 +208,7 @@ public abstract class JobUtil {
 
     /**
      * Property containing the job event. The value is of type org.osgi.service.event.Event.
+     * Since 1.6 this property is not send anymore.
      * @deprecated
      */
     @Deprecated

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java Thu Oct 16 09:25:57 2014
@@ -20,7 +20,12 @@ package org.apache.sling.event.jobs;
 
 
 /**
- * This class contains constants for event notifications
+ * This class contains constants for event notifications.
+ *
+ * Notifications for jobs can only be received on the instance where the job
+ * action is taking place. They are not send to other instances using
+ * remove events.
+ *
  * @since 1.3
  */
 public abstract class NotificationConstants {
@@ -76,6 +81,15 @@ public abstract class NotificationConsta
     public static final String TOPIC_JOB_REMOVED = "org/apache/sling/event/notification/job/REMOVED";
 
     /**
+     * Asynchronous notification event when a job is added.
+     * The property {@link #NOTIFICATION_PROPERTY_JOB_TOPIC} contains the job topic,
+     * the property {@link #NOTIFICATION_PROPERTY_JOB_ID} contains the unique job id.
+     * The payload of the job is available as additional job specific properties.
+     * @since 1.6
+     */
+    public static final String TOPIC_JOB_ADDED = "org/apache/sling/event/notification/job/ADDED";
+
+    /**
      * Property containing the job topic. Value is of type String.
      * @see Job#getTopic()
      */

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java Thu Oct 16 09:25:57 2014
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-@Version("1.5.0")
+@Version("1.6.0")
 package org.apache.sling.event.jobs;
 
 import aQute.bnd.annotation.Version;

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java Thu Oct 16 09:25:57 2014
@@ -136,7 +136,10 @@ public class ClassloadingTest extends Ab
 
                 @Override
                 public String getDescription() {
-                    return "Waiting for job to be processed";
+                    return "Waiting for job to be processed. Conditions: queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs() +
+                            ", jobsCount=" + processedJobsCount + ", findJobs=" +
+                            jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String, Object>[]) null)
+                            .size();
                 }
 
                 @Override
@@ -206,13 +209,19 @@ public class ClassloadingTest extends Ab
                             && finishedEvents.size() == 0
                             && jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
                                     (Map<String, Object>[]) null).size() == 1
-                            && jobManager.getStatistics().getNumberOfQueuedJobs() == 0
+                            && jobManager.getStatistics().getNumberOfQueuedJobs() == 1
                             && jobManager.getStatistics().getNumberOfActiveJobs() == 0;
                 }
 
                 @Override
                 public String getDescription() {
-                    return "Waiting for job failure to be recorded";
+                    return "Waiting for job failure to be recorded. Conditions " +
+                           "faildJobsCount=" + failedJobsCount.get() +
+                           ", finishedEvents=" + finishedEvents.size() +
+                           ", findJobs= " + jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
+                                   (Map<String, Object>[]) null).size()
+                           +", queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs()
+                           +", activeJobs=" + jobManager.getStatistics().getNumberOfActiveJobs();
                 }
             }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
 

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Thu Oct 16 09:25:57 2014
@@ -94,7 +94,7 @@ public class JobHandlingTest extends Abs
         final Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(ResourceHelper.PROPERTY_JOB_TOPIC, "sling/test");
         if ( id != null ) {
-            props.put(ResourceHelper.PROPERTY_JOB_NAME, id);
+            props.put(JobUtil.PROPERTY_JOB_NAME, id);
         }
 
         return new Event(JobUtil.TOPIC_JOB, props);
@@ -232,7 +232,7 @@ public class JobHandlingTest extends Abs
 
             assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
             // job is currently waiting, therefore cancel fails
-            final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)"myid2"));
+            final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
             assertNotNull(e1);
             assertFalse(jobManager.removeJob((String)e1.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
             cb2.block(); // and continue job
@@ -240,7 +240,7 @@ public class JobHandlingTest extends Abs
             sleep(200);
 
             // the job is now in the queue again
-            final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)"myid2"));
+            final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
             assertNotNull(e2);
             assertTrue(jobManager.removeJob((String)e2.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
             assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
@@ -313,7 +313,7 @@ public class JobHandlingTest extends Abs
 
             assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
             // job is currently sleeping, but force cancel always waits!
-            final Event e = jobManager.findJob("sling/test", Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)"myid3"));
+            final Event e = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid3"));
             assertNotNull(e);
             jobManager.forceRemoveJob((String)e.getProperty(ResourceHelper.PROPERTY_JOB_ID));
             // the job is now removed
@@ -433,8 +433,7 @@ public class JobHandlingTest extends Abs
 
                     @Override
                     public void handleEvent(Event event) {
-                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
-                        final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+                        final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
                         cancelled.add(id);
                     }
                 });
@@ -443,8 +442,7 @@ public class JobHandlingTest extends Abs
 
                     @Override
                     public void handleEvent(Event event) {
-                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
-                        final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+                        final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
                         failed.add(id);
                     }
                 });
@@ -453,8 +451,7 @@ public class JobHandlingTest extends Abs
 
                     @Override
                     public void handleEvent(Event event) {
-                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
-                        final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+                        final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
                         finished.add(id);
                     }
                 });
@@ -463,8 +460,7 @@ public class JobHandlingTest extends Abs
 
                     @Override
                     public void handleEvent(Event event) {
-                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
-                        final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+                        final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
                         started.add(id);
                     }
                 });