You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 11:25:58 UTC
svn commit: r1632253 - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/jobs/
src/main/java/org/apache/sling/event/impl/jobs/config/
src/main/java/org/apache/sling/event/i...
Author: cziegeler
Date: Thu Oct 16 09:25:57 2014
New Revision: 1632253
URL: http://svn.apache.org/r1632253
Log:
SLING-4068 : Drop support PROPERTY_NOTIFICATION_JOB event property
SLING-4065 : Add notification when a job is added
SLING-4063 : Fix job queue statistics
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java (with props)
Removed:
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Thu Oct 16 09:25:57 2014
@@ -116,7 +116,7 @@
-Xmx2048m -XX:MaxPermSize=512m
</argLine>
<includes>
- <include>**/it/OrderedQueueTest*</include>
+ <include>**/it/Ordered*</include>
</includes>
</configuration>
</plugin>
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Thu Oct 16 09:25:57 2014
@@ -38,8 +38,11 @@ import org.apache.sling.commons.threads.
/**
* The configurable eventing thread pool.
*/
-@Component(label="%event.pool.name",
- description="%event.pool.description",
+@Component(label="Apache Sling Job Thread Pool",
+ description="This is the thread pool used by the Apache Sling job handling. The "
+ + "threads from this pool are merely used for executing jobs. By limiting this pool, it is "
+ + "possible to limit the maximum number of parallel processed jobs - regardless of the queue "
+ + "configuration.",
metatype=true)
@Service(value=EventingThreadPool.class)
public class EventingThreadPool implements ThreadPool {
@@ -52,7 +55,10 @@ public class EventingThreadPool implemen
private static final int DEFAULT_POOL_SIZE = 35;
- @Property(intValue=DEFAULT_POOL_SIZE)
+ @Property(intValue=DEFAULT_POOL_SIZE,
+ label="Pool Size",
+ description="The size of the thread pool. This pool is used to execute jobs and therefore "
+ + "limits the maximum number of jobs executed in parallel.")
private static final String PROPERTY_POOL_SIZE = "minPoolSize";
public EventingThreadPool() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Thu Oct 16 09:25:57 2014
@@ -58,8 +58,10 @@ import org.slf4j.LoggerFactory;
/**
* This component manages/keeps track of all job consumer services.
*/
-@Component(label="%job.consumermanager.name",
- description="%job.consumermanager.description",
+@Component(label="Apache Sling Job Consumer Manager",
+ description="The consumer manager controls the job consumer (= processors). "
+ + "It can be used to temporarily disable job processing on the current instance. Other instances "
+ + "in a cluster are not affected.",
metatype=true)
@Service(value=JobConsumerManager.class)
@References({
@@ -76,10 +78,20 @@ import org.slf4j.LoggerFactory;
+ "only used on the current instance. This option should always be disabled!")
public class JobConsumerManager {
- @Property(unbounded=PropertyUnbounded.ARRAY, value = "*")
+ @Property(unbounded=PropertyUnbounded.ARRAY, value = "*",
+ label="Topic Whitelist",
+ description="This is a list of topics which currently should be "
+ + "processed by this instance. Leaving it empty, all job consumers are disabled. Putting a '*' as "
+ + "one entry, enables all job consumers. Adding separate topics enables job consumers for exactly "
+ + "this topic.")
private static final String PROPERTY_WHITELIST = "job.consumermanager.whitelist";
- @Property(unbounded=PropertyUnbounded.ARRAY)
+ @Property(unbounded=PropertyUnbounded.ARRAY,
+ label="Topic Blacklist",
+ description="This is a list of topics which currently shouldn't be "
+ + "processed by this instance. Leaving it empty, all job consumers are enabled. Putting a '*' as "
+ + "one entry, disables all job consumers. Adding separate topics disables job consumers for exactly "
+ + "this topic.")
private static final String PROPERTY_BLACKLIST = "job.consumermanager.blacklist";
/** The map with the consumers, keyed by topic, sorted by service ranking. */
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 09:25:57 2014
@@ -49,6 +49,7 @@ import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
import org.apache.sling.event.impl.jobs.queues.QueueManager;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
@@ -61,6 +62,7 @@ import org.apache.sling.event.impl.suppo
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobBuilder;
import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.JobsIterator;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
@@ -399,7 +401,7 @@ public class JobManagerImpl
} else {
logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
}
- Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_REMOVED, job, null);
+ NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_REMOVED, job, null);
} catch ( final PersistenceException pe) {
this.ignoreException(pe);
result = false;
@@ -452,7 +454,7 @@ public class JobManagerImpl
buf.append("//element(*,");
buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
buf.append(")[@");
- buf.append(ISO9075.encode(ResourceHelper.PROPERTY_JOB_NAME));
+ buf.append(ISO9075.encode(JobUtil.PROPERTY_JOB_NAME));
buf.append(" = '");
buf.append(name);
buf.append("']");
@@ -895,7 +897,7 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", info.queueName, Utility.toString(jobTopic, jobName, jobProperties));
}
- Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, jobTopic, jobName, jobProperties, null);
+ NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, jobTopic, jobName, jobProperties, null);
} else {
// check for unique jobs
if ( jobName != null && !this.lock(jobTopic, jobName) ) {
@@ -966,7 +968,7 @@ public class JobManagerImpl
properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
if ( jobName != null ) {
- properties.put(ResourceHelper.PROPERTY_JOB_NAME, jobName);
+ properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
}
properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueConfiguration.getName());
properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Thu Oct 16 09:25:57 2014
@@ -53,6 +53,7 @@ import org.apache.sling.event.impl.suppo
import org.apache.sling.event.impl.support.ScheduleInfoImpl;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobBuilder;
+import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.ScheduleInfo;
import org.apache.sling.event.jobs.ScheduleInfo.ScheduleType;
import org.apache.sling.event.jobs.ScheduledJobInfo;
@@ -534,7 +535,7 @@ public class JobSchedulerImpl
properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
if ( jobName != null ) {
- properties.put(ResourceHelper.PROPERTY_JOB_NAME, jobName);
+ properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
}
properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Thu Oct 16 09:25:57 2014
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Dictionary;
import java.util.HashMap;
-import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -36,11 +35,8 @@ import org.apache.sling.api.resource.Val
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobUtil;
-import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
-import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
public abstract class Utility {
@@ -101,76 +97,6 @@ public abstract class Utility {
return msg;
}
- /** Event property containing the time for job start and job finished events. */
- public static final String PROPERTY_TIME = "time";
-
- /**
- * Helper method for sending the notification events.
- */
- public static void sendNotification(final EventAdmin eventAdmin,
- final String eventTopic,
- final String jobTopic,
- final String jobName,
- final Map<String, Object> jobProperties,
- final Long time) {
- if ( eventAdmin != null ) {
- // create job object
- final Map<String, Object> jobProps;
- if ( jobProperties == null ) {
- jobProps = new HashMap<String, Object>();
- } else {
- jobProps = jobProperties;
- }
- final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
- sendNotificationInternal(eventAdmin, eventTopic, job, time);
- }
- }
-
- /**
- * Helper method for sending the notification events.
- */
- public static void sendNotification(final EventAdmin eventAdmin,
- final String eventTopic,
- final Job job,
- final Long time) {
- if ( eventAdmin != null ) {
- // create new copy of job object
- final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
- sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
- }
- }
-
- /**
- * Helper method for sending the notification events.
- */
- private static void sendNotificationInternal(final EventAdmin eventAdmin,
- final String eventTopic,
- final Job job,
- final Long time) {
- final Dictionary<String, Object> eventProps = new Hashtable<String, Object>();
- // add basic job properties
- eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
- eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
- if ( job.getName() != null ) {
- eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
- }
- // copy payload
- for(final String name : job.getPropertyNames()) {
- eventProps.put(name, job.getProperty(name));
- }
- // remove async handler
- eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
- // add timestamp
- eventProps.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
- // add internal time information
- if ( time != null ) {
- eventProps.put(PROPERTY_TIME, time);
- }
- // compatibility:
- eventProps.put(JobUtil.PROPERTY_NOTIFICATION_JOB, toEvent(job));
- eventAdmin.postEvent(new Event(eventTopic, eventProps));
- }
-
/**
* Create an event from a job
* @param job The job
@@ -180,7 +106,7 @@ public abstract class Utility {
final Map<String, Object> eventProps = new HashMap<String, Object>();
eventProps.putAll(((JobImpl)job).getProperties());
if ( job.getName() != null ) {
- eventProps.put(ResourceHelper.PROPERTY_JOB_NAME, job.getName());
+ eventProps.put(JobUtil.PROPERTY_JOB_NAME, job.getName());
}
eventProps.put(ResourceHelper.PROPERTY_JOB_ID, job.getId());
eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
@@ -197,7 +123,7 @@ public abstract class Utility {
boolean first = true;
for(final String propName : properties.keySet()) {
if ( propName.equals(ResourceHelper.PROPERTY_JOB_ID)
- || propName.equals(ResourceHelper.PROPERTY_JOB_NAME)
+ || propName.equals(JobUtil.PROPERTY_JOB_NAME)
|| propName.equals(ResourceHelper.PROPERTY_JOB_TOPIC) ) {
continue;
}
@@ -297,7 +223,7 @@ public abstract class Utility {
}
}
job = new JobImpl(topic,
- (String)jobProperties.get(ResourceHelper.PROPERTY_JOB_NAME),
+ (String)jobProperties.get(JobUtil.PROPERTY_JOB_NAME),
jobId,
jobProperties);
} else {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Thu Oct 16 09:25:57 2014
@@ -36,38 +36,80 @@ import org.apache.sling.event.jobs.JobUt
import org.apache.sling.event.jobs.QueueConfiguration;
import org.osgi.framework.Constants;
-@Component(metatype=true,name="org.apache.sling.event.jobs.QueueConfiguration",
- label="%queue.name", description="%queue.description",
- configurationFactory=true,policy=ConfigurationPolicy.REQUIRE)
+@Component(metatype=true,
+ name="org.apache.sling.event.jobs.QueueConfiguration",
+ label="Apache Sling Job Queue Configuration",
+ description="The configuration of a job processing queue.",
+ configurationFactory=true, policy=ConfigurationPolicy.REQUIRE)
@Service(value={InternalQueueConfiguration.class})
@Properties({
- @Property(name=ConfigurationConstants.PROP_NAME),
- @Property(name=ConfigurationConstants.PROP_TYPE,
- value=ConfigurationConstants.DEFAULT_TYPE,
- options={@PropertyOption(name="UNORDERED",value="Parallel"),
- @PropertyOption(name="ORDERED",value="Ordered"),
- @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin")}),
+ @Property(name=ConfigurationConstants.PROP_NAME,
+ label="Name",
+ description="The name of the queue. If matching is used the token {0} can be used to substitute the real value."),
@Property(name=ConfigurationConstants.PROP_TOPICS,
- unbounded=PropertyUnbounded.ARRAY),
+ unbounded=PropertyUnbounded.ARRAY,
+ label="Topics",
+ description="This value is required and lists the topics processed by "
+ + "this queue. The value is a list of strings. If a string ends with a dot, "
+ + "all topics in exactly this package match. If the string ends with a star, "
+ + "all topics in this package and all subpackages match. If the string neither "
+ + "ends with a dot nor with a star, this is assumed to define an exact topic."),
+ @Property(name=ConfigurationConstants.PROP_TYPE,
+ value=ConfigurationConstants.DEFAULT_TYPE,
+ options={@PropertyOption(name="UNORDERED",value="Parallel"),
+ @PropertyOption(name="ORDERED",value="Ordered"),
+ @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin")},
+ label="Type",
+ description="The queue type."),
@Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
- intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
+ intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
+ label="Maximum Parallel Jobs",
+ description="The maximum number of parallel jobs started for this queue. "
+ + "A value of -1 is substituted with the number of available processors."),
@Property(name=ConfigurationConstants.PROP_RETRIES,
- intValue=ConfigurationConstants.DEFAULT_RETRIES),
+ intValue=ConfigurationConstants.DEFAULT_RETRIES,
+ label="Maximum Retries",
+ description="The maximum number of times a failed job slated "
+ + "for retries is actually retried. If a job has been retried this number of "
+ + "times and still fails, it is not rescheduled and assumed to have failed. The "
+ + "default value is 10."),
@Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
- longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+ longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY,
+ label="Retry Delay",
+ description="The number of milliseconds to sleep between two "
+ + "consecutive retries of a job which failed and was set to be retried. The "
+ + "default value is 2 seconds. This value is only relevant if there is a single "
+ + "failed job in the queue. If there are multiple failed jobs, each job is "
+ + "retried in turn without an intervening delay."),
@Property(name=ConfigurationConstants.PROP_PRIORITY,
- value=ConfigurationConstants.DEFAULT_PRIORITY,
- options={@PropertyOption(name="NORM",value="Norm"),
- @PropertyOption(name="MIN",value="Min"),
- @PropertyOption(name="MAX",value="Max")}),
+ value=ConfigurationConstants.DEFAULT_PRIORITY,
+ options={@PropertyOption(name="NORM",value="Norm"),
+ @PropertyOption(name="MIN",value="Min"),
+ @PropertyOption(name="MAX",value="Max")},
+ label="Priority",
+ description="The priority for the threads used by this queue. Default is norm."),
@Property(name=ConfigurationConstants.PROP_KEEP_JOBS,
- boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS),
+ boolValue=ConfigurationConstants.DEFAULT_KEEP_JOBS,
+ label="Keep History",
+ description="If this option is enabled, successful finished jobs are kept "
+ + "to provide a complete history."),
@Property(name=ConfigurationConstants.PROP_PREFER_RUN_ON_CREATION_INSTANCE,
- boolValue=ConfigurationConstants.DEFAULT_PREFER_RUN_ON_CREATION_INSTANCE),
+ boolValue=ConfigurationConstants.DEFAULT_PREFER_RUN_ON_CREATION_INSTANCE,
+ label="Prefer Creation Instance",
+ description="If this option is enabled, the jobs are tried to "
+ + "be run on the instance where the job was created."),
@Property(name=ConfigurationConstants.PROP_THREAD_POOL_SIZE,
- intValue=ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE),
- @Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
- label="%queue.ranking.name", description="%queue.ranking.description")
+ intValue=ConfigurationConstants.DEFAULT_THREAD_POOL_SIZE,
+ label="Thread Pool Size",
+ description="Optional configuration value for a thread pool to be used by "
+ + "this queue. If this is value has a positive number of threads configuration, this queue uses "
+ + "an own thread pool with the configured number of threads."),
+ @Property(name=Constants.SERVICE_RANKING,
+ intValue=0,
+ propertyPrivate=false,
+ label="Ranking",
+ description="Integer value defining the ranking of this queue configuration. "
+ + "If more than one queue matches a job topic, the one with the highest ranking is used.")
})
public class InternalQueueConfiguration
implements QueueConfiguration, Comparable<InternalQueueConfiguration> {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java Thu Oct 16 09:25:57 2014
@@ -37,23 +37,39 @@ import org.slf4j.LoggerFactory;
* This is the configuration for the main queue.
*
*/
-@Component(label="%job.events.name",
- description="%job.events.description",
- name="org.apache.sling.event.impl.jobs.DefaultJobManager",
- metatype=true)
+@Component(label="Apache Sling Job Default Queue",
+ description="The configuration of the default job queue.",
+ name="org.apache.sling.event.impl.jobs.DefaultJobManager",
+ metatype=true)
@Service(value=MainQueueConfiguration.class)
@Properties({
@Property(name=ConfigurationConstants.PROP_PRIORITY,
- value=ConfigurationConstants.DEFAULT_PRIORITY,
- options={@PropertyOption(name="NORM",value="Norm"),
- @PropertyOption(name="MIN",value="Min"),
- @PropertyOption(name="MAX",value="Max")}),
+ value=ConfigurationConstants.DEFAULT_PRIORITY,
+ options={@PropertyOption(name="NORM",value="Norm"),
+ @PropertyOption(name="MIN",value="Min"),
+ @PropertyOption(name="MAX",value="Max")},
+ label="Priority",
+ description="The priority for the threads used by this queue. Default is norm."),
@Property(name=ConfigurationConstants.PROP_RETRIES,
- intValue=ConfigurationConstants.DEFAULT_RETRIES),
+ intValue=ConfigurationConstants.DEFAULT_RETRIES,
+ label="Maximum Retries",
+ description="The maximum number of times a failed job slated "
+ + "for retries is actually retried. If a job has been retried this number of "
+ + "times and still fails, it is not rescheduled and assumed to have failed. The "
+ + "default value is 10."),
@Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
- longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+ longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY,
+ label="Retry Delay",
+ description="The number of milliseconds to sleep between two "
+ + "consecutive retries of a job which failed and was set to be retried. The "
+ + "default value is 2 seconds. This value is only relevant if there is a single "
+ + "failed job in the queue. If there are multiple failed jobs, each job is "
+ + "retried in turn without an intervening delay."),
@Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
- intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL)
+ intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
+ label="Maximum Parallel Jobs",
+ description="The maximum number of parallel jobs started for this queue. "
+ + "A value of -1 is substituted with the number of available processors."),
})
public class MainQueueConfiguration {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java Thu Oct 16 09:25:57 2014
@@ -160,7 +160,7 @@ public class EventAdminBridge
}
} else {
final String jobTopic = (String)event.getProperty(ResourceHelper.PROPERTY_JOB_TOPIC);
- final String jobName = (String)event.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+ final String jobName = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
final Map<String, Object> props = new EventPropertiesMap(event);
props.put(JobImpl.PROPERTY_BRIDGED_EVENT, Boolean.TRUE);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java Thu Oct 16 09:25:57 2014
@@ -32,6 +32,7 @@ import org.apache.sling.event.JobsIterat
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobManager.QueryType;
+import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.Queue;
import org.osgi.service.event.Event;
@@ -57,7 +58,7 @@ public class JobStatusProviderImpl
public boolean removeJob(final String topic, final String jobId) {
if ( jobId != null && topic != null ) {
final Event job = this.jobManager.findJob(topic,
- Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)jobId));
+ Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
if ( job != null ) {
return this.removeJob((String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID));
}
@@ -81,7 +82,7 @@ public class JobStatusProviderImpl
public void forceRemoveJob(final String topic, final String jobId) {
if ( jobId != null && topic != null ) {
final Event job = this.jobManager.findJob(topic,
- Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)jobId));
+ Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
if ( job != null ) {
this.forceRemoveJob((String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID));
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java?rev=1632253&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java Thu Oct 16 09:25:57 2014
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.notifications;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This component receives resource added events and sends a job
+ * created event.
+ */
+@Component
+public class NewJobSender implements EventHandler {
+
+ /** Logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** The job manager configuration. */
+ @Reference
+ private JobManagerConfiguration configuration;
+
+ /** The event admin. */
+ @Reference
+ private EventAdmin eventAdmin;
+
+ /** Service registration for the event handler. */
+ private volatile ServiceRegistration eventHandlerRegistration;
+
+ /**
+ * Activate this component.
+ * Register an event handler.
+ */
+ @Activate
+ protected void activate(final BundleContext bundleContext) {
+ final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+ properties.put(Constants.SERVICE_DESCRIPTION, "Apache Sling Job Topic Manager Event Handler");
+ properties.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+ properties.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED);
+ properties.put(EventConstants.EVENT_FILTER,
+ "(" + SlingConstants.PROPERTY_PATH + "=" +
+ this.configuration.getLocalJobsPath() + "/*)");
+ this.eventHandlerRegistration = bundleContext.registerService(EventHandler.class.getName(), this, properties);
+ }
+
+ /**
+ * Deactivate this component.
+ * Unregister the event handler.
+ */
+ @Deactivate
+ protected void deactivate() {
+ if ( this.eventHandlerRegistration != null ) {
+ this.eventHandlerRegistration.unregister();
+ this.eventHandlerRegistration = null;
+ }
+ }
+
+ @Override
+ public void handleEvent(final Event event) {
+ logger.debug("Received event {}", event);
+ final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
+ final String rt = (String) event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
+ if ( ResourceHelper.RESOURCE_TYPE_JOB.equals(rt) && this.configuration.isLocalJob(path) ) {
+ // read the job
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource rsrc = resolver.getResource(path);
+ if ( rsrc != null ) {
+ final Job job = Utility.readJob(this.logger, rsrc);
+ if ( job != null ) {
+ logger.debug("Sending job added event for {}", job);
+ NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_ADDED, job, null);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java?rev=1632253&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java Thu Oct 16 09:25:57 2014
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.notifications;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+
+public abstract class NotificationUtility {
+
+ /** Event property containing the time for job start and job finished events. */
+ public static final String PROPERTY_TIME = ":time";
+
+ /**
+ * Helper method for sending the notification events.
+ */
+ public static void sendNotification(final EventAdmin eventAdmin,
+ final String eventTopic,
+ final String jobTopic,
+ final String jobName,
+ final Map<String, Object> jobProperties,
+ final Long time) {
+ if ( eventAdmin != null ) {
+ // create job object
+ final Map<String, Object> jobProps;
+ if ( jobProperties == null ) {
+ jobProps = new HashMap<String, Object>();
+ } else {
+ jobProps = jobProperties;
+ }
+ final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
+ sendNotificationInternal(eventAdmin, eventTopic, job, time);
+ }
+ }
+
+ /**
+ * Helper method for sending the notification events.
+ */
+ public static void sendNotification(final EventAdmin eventAdmin,
+ final String eventTopic,
+ final Job job,
+ final Long time) {
+ if ( eventAdmin != null ) {
+ // create new copy of job object
+ final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
+ sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
+ }
+ }
+
+ /**
+ * Helper method for sending the notification events.
+ */
+ private static void sendNotificationInternal(final EventAdmin eventAdmin,
+ final String eventTopic,
+ final Job job,
+ final Long time) {
+ final Dictionary<String, Object> eventProps = new Hashtable<String, Object>();
+ // add basic job properties
+ eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
+ eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
+ if ( job.getName() != null ) {
+ eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
+ }
+ // copy payload
+ for(final String name : job.getPropertyNames()) {
+ eventProps.put(name, job.getProperty(name));
+ }
+ // remove async handler
+ eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
+ // add timestamp
+ eventProps.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
+ // add internal time information
+ if ( time != null ) {
+ eventProps.put(PROPERTY_TIME, time);
+ }
+ if ( NotificationConstants.TOPIC_JOB_ADDED.equals(eventTopic) ) {
+ eventAdmin.sendEvent(new Event(eventTopic, eventProps));
+ } else {
+ eventAdmin.postEvent(new Event(eventTopic, eventProps));
+ }
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 09:25:57 2014
@@ -39,6 +39,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
@@ -370,7 +371,7 @@ public abstract class AbstractJobQueue
if ( consumer != null ) {
final long queueTime = handler.started - handler.queued;
- Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
synchronized ( this.processingJobsLists ) {
this.processingJobsLists.put(job.getId(), handler);
}
@@ -592,7 +593,7 @@ public abstract class AbstractJobQueue
this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
}
info.processingTime = System.currentTimeMillis() - handler.started;
- Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
break;
case QUEUED : // check if we exceeded the number of retries
final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
@@ -603,7 +604,7 @@ public abstract class AbstractJobQueue
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
}
- Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
} else {
info.reschedule = true;
this.reschedule(handler);
@@ -611,14 +612,14 @@ public abstract class AbstractJobQueue
this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
}
handler.queued = System.currentTimeMillis();
- Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
}
break;
default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
}
- Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
break;
}
@@ -726,7 +727,7 @@ public abstract class AbstractJobQueue
logger.debug("Received ack for job {}", Utility.toString(ack.getJob()));
}
final long queueTime = ack.started - ack.queued;
- Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
synchronized ( this.processingJobsLists ) {
this.processingJobsLists.put(jobId, ack);
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Thu Oct 16 09:25:57 2014
@@ -27,13 +27,10 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.SlingConstants;
-import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.TestLogger;
-import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Statistics;
@@ -48,14 +45,13 @@ import org.slf4j.LoggerFactory;
@Service(value={EventHandler.class, StatisticsManager.class})
@Properties({
@Property(name=EventConstants.EVENT_TOPIC,
- value={SlingConstants.TOPIC_RESOURCE_ADDED,
+ value={NotificationConstants.TOPIC_JOB_ADDED,
NotificationConstants.TOPIC_JOB_STARTED,
NotificationConstants.TOPIC_JOB_CANCELLED,
NotificationConstants.TOPIC_JOB_FAILED,
NotificationConstants.TOPIC_JOB_FINISHED,
NotificationConstants.TOPIC_JOB_REMOVED})
})
-// TODO register event handlers on activate to allow for filters!
public class StatisticsManager implements EventHandler {
/** Logger. */
@@ -106,70 +102,57 @@ public class StatisticsManager implement
@Override
public void handleEvent(final Event event) {
- if ( SlingConstants.TOPIC_RESOURCE_ADDED.equals(event.getTopic()) ) {
- final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
- if ( this.configuration.isLocalJob(path) ) {
- final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
- final int topicEnd = path.indexOf("/", topicStart);
- final String topic;
- if ( topicEnd == -1 ) {
- topic = path.substring(topicStart).replace('.', '/');
- } else {
- topic = path.substring(topicStart, topicEnd).replace('.', '/');
- }
+ final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+ if ( topic != null ) { // this is just a sanity check
+ final String queueName = (String)event.getProperty(Job.PROPERTY_JOB_QUEUE_NAME);
+ final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
+
+ TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+ if ( ts == null ) {
+ this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
+ ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+ }
+
+ if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_ADDED) ) {
this.baseStatistics.incQueued();
- final QueueInfo info = this.queueConfigurationManager.getQueueInfo(topic);
- final StatisticsImpl queueStats = getStatisticsForQueue(info.queueName);
queueStats.incQueued();
- }
- } else {
- if ( EventUtil.isLocal(event) ) {
- // job notifications
- final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
- if ( topic != null ) { // this is just a sanity check
- final String queueName = (String)event.getProperty(Job.PROPERTY_JOB_QUEUE_NAME);
- final StatisticsImpl queueStats = getStatisticsForQueue(queueName);
-
- TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
- if ( ts == null ) {
- this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
- ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
- }
-
- if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
- ts.addCancelled();
- this.baseStatistics.cancelledJob();
- if ( queueStats != null ) {
- queueStats.cancelledJob();
- }
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
- ts.addFailed();
- this.baseStatistics.failedJob();
- if ( queueStats != null ) {
- queueStats.failedJob();
- }
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
- final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
- ts.addFinished(time == null ? -1 : time);
- this.baseStatistics.finishedJob(time == null ? -1 : time);
- if ( queueStats != null ) {
- queueStats.finishedJob(time == null ? -1 : time);
- }
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
- final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
- ts.addActivated(time == null ? -1 : time);
- this.baseStatistics.addActive(time == null ? -1 : time);
- if ( queueStats != null ) {
- queueStats.addActive(time == null ? -1 : time);
- }
- } else if ( NotificationConstants.TOPIC_JOB_REMOVED.equals(event.getTopic()) ) {
- this.baseStatistics.decQueued();
- this.baseStatistics.cancelledJob();
- if ( queueStats != null ) {
- queueStats.decQueued();
- queueStats.cancelledJob();
- }
- }
+
+ } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
+ ts.addCancelled();
+ this.baseStatistics.cancelledJob();
+ if ( queueStats != null ) {
+ queueStats.cancelledJob();
+ }
+
+ } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
+ ts.addFailed();
+ this.baseStatistics.failedJob();
+ if ( queueStats != null ) {
+ queueStats.failedJob();
+ }
+
+ } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
+ final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
+ ts.addFinished(time == null ? -1 : time);
+ this.baseStatistics.finishedJob(time == null ? -1 : time);
+ if ( queueStats != null ) {
+ queueStats.finishedJob(time == null ? -1 : time);
+ }
+
+ } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
+ final Long time = (Long)event.getProperty(NotificationUtility.PROPERTY_TIME);
+ ts.addActivated(time == null ? -1 : time);
+ this.baseStatistics.addActive(time == null ? -1 : time);
+ if ( queueStats != null ) {
+ queueStats.addActive(time == null ? -1 : time);
+ }
+
+ } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_REMOVED) ) {
+ this.baseStatistics.decQueued();
+ this.baseStatistics.cancelledJob();
+ if ( queueStats != null ) {
+ queueStats.decQueued();
+ queueStats.cancelledJob();
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java Thu Oct 16 09:25:57 2014
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobUtil;
import org.osgi.service.event.Event;
final class ScheduleInfo implements Serializable {
@@ -66,7 +67,7 @@ final class ScheduleInfo implements Seri
}
final String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
- final String jId = (String)event.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+ final String jId = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
this.jobId = getJobId(topic, id, jId);
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Thu Oct 16 09:25:57 2014
@@ -65,8 +65,6 @@ public abstract class ResourceHelper {
public static final String PROPERTY_SCHEDULE_SUSPENDED = "slingevent:scheduleSuspended";
public static final String PROPERTY_JOB_ID = "slingevent:eventId";
- @Deprecated
- public static final String PROPERTY_JOB_NAME = "event.job.id";
public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
/** List of ignored properties to write to the repository. */
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java Thu Oct 16 09:25:57 2014
@@ -208,6 +208,7 @@ public abstract class JobUtil {
/**
* Property containing the job event. The value is of type org.osgi.service.event.Event.
+ * Since 1.6 this property is not send anymore.
* @deprecated
*/
@Deprecated
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java Thu Oct 16 09:25:57 2014
@@ -20,7 +20,12 @@ package org.apache.sling.event.jobs;
/**
- * This class contains constants for event notifications
+ * This class contains constants for event notifications.
+ *
+ * Notifications for jobs can only be received on the instance where the job
+ * action is taking place. They are not send to other instances using
+ * remove events.
+ *
* @since 1.3
*/
public abstract class NotificationConstants {
@@ -76,6 +81,15 @@ public abstract class NotificationConsta
public static final String TOPIC_JOB_REMOVED = "org/apache/sling/event/notification/job/REMOVED";
/**
+ * Asynchronous notification event when a job is added.
+ * The property {@link #NOTIFICATION_PROPERTY_JOB_TOPIC} contains the job topic,
+ * the property {@link #NOTIFICATION_PROPERTY_JOB_ID} contains the unique job id.
+ * The payload of the job is available as additional job specific properties.
+ * @since 1.6
+ */
+ public static final String TOPIC_JOB_ADDED = "org/apache/sling/event/notification/job/ADDED";
+
+ /**
* Property containing the job topic. Value is of type String.
* @see Job#getTopic()
*/
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java Thu Oct 16 09:25:57 2014
@@ -17,7 +17,7 @@
* under the License.
*/
-@Version("1.5.0")
+@Version("1.6.0")
package org.apache.sling.event.jobs;
import aQute.bnd.annotation.Version;
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java Thu Oct 16 09:25:57 2014
@@ -136,7 +136,10 @@ public class ClassloadingTest extends Ab
@Override
public String getDescription() {
- return "Waiting for job to be processed";
+ return "Waiting for job to be processed. Conditions: queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs() +
+ ", jobsCount=" + processedJobsCount + ", findJobs=" +
+ jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String, Object>[]) null)
+ .size();
}
@Override
@@ -206,13 +209,19 @@ public class ClassloadingTest extends Ab
&& finishedEvents.size() == 0
&& jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
(Map<String, Object>[]) null).size() == 1
- && jobManager.getStatistics().getNumberOfQueuedJobs() == 0
+ && jobManager.getStatistics().getNumberOfQueuedJobs() == 1
&& jobManager.getStatistics().getNumberOfActiveJobs() == 0;
}
@Override
public String getDescription() {
- return "Waiting for job failure to be recorded";
+ return "Waiting for job failure to be recorded. Conditions " +
+ "faildJobsCount=" + failedJobsCount.get() +
+ ", finishedEvents=" + finishedEvents.size() +
+ ", findJobs= " + jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
+ (Map<String, Object>[]) null).size()
+ +", queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs()
+ +", activeJobs=" + jobManager.getStatistics().getNumberOfActiveJobs();
}
}, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1632253&r1=1632252&r2=1632253&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Thu Oct 16 09:25:57 2014
@@ -94,7 +94,7 @@ public class JobHandlingTest extends Abs
final Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(ResourceHelper.PROPERTY_JOB_TOPIC, "sling/test");
if ( id != null ) {
- props.put(ResourceHelper.PROPERTY_JOB_NAME, id);
+ props.put(JobUtil.PROPERTY_JOB_NAME, id);
}
return new Event(JobUtil.TOPIC_JOB, props);
@@ -232,7 +232,7 @@ public class JobHandlingTest extends Abs
assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
// job is currently waiting, therefore cancel fails
- final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)"myid2"));
+ final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
assertNotNull(e1);
assertFalse(jobManager.removeJob((String)e1.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
cb2.block(); // and continue job
@@ -240,7 +240,7 @@ public class JobHandlingTest extends Abs
sleep(200);
// the job is now in the queue again
- final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)"myid2"));
+ final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
assertNotNull(e2);
assertTrue(jobManager.removeJob((String)e2.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
@@ -313,7 +313,7 @@ public class JobHandlingTest extends Abs
assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
// job is currently sleeping, but force cancel always waits!
- final Event e = jobManager.findJob("sling/test", Collections.singletonMap(ResourceHelper.PROPERTY_JOB_NAME, (Object)"myid3"));
+ final Event e = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid3"));
assertNotNull(e);
jobManager.forceRemoveJob((String)e.getProperty(ResourceHelper.PROPERTY_JOB_ID));
// the job is now removed
@@ -433,8 +433,7 @@ public class JobHandlingTest extends Abs
@Override
public void handleEvent(Event event) {
- final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
- final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+ final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
cancelled.add(id);
}
});
@@ -443,8 +442,7 @@ public class JobHandlingTest extends Abs
@Override
public void handleEvent(Event event) {
- final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
- final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+ final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
failed.add(id);
}
});
@@ -453,8 +451,7 @@ public class JobHandlingTest extends Abs
@Override
public void handleEvent(Event event) {
- final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
- final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+ final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
finished.add(id);
}
});
@@ -463,8 +460,7 @@ public class JobHandlingTest extends Abs
@Override
public void handleEvent(Event event) {
- final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
- final String id = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_NAME);
+ final String id = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME);
started.add(id);
}
});