You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/20 15:58:56 UTC
svn commit: r1633159 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/config/
main/java/org/apache/sling/event/impl/jobs/notifications/
main/java/org/apache/sling/eve...
Author: cziegeler
Date: Mon Oct 20 13:58:56 2014
New Revision: 1633159
URL: http://svn.apache.org/r1633159
Log:
SLING-4065 : Add notification when a job is added. Calculate event properties from path instead of reading the job
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java Mon Oct 20 13:58:56 2014
@@ -20,18 +20,33 @@ package org.apache.sling.event.impl.jobs
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+/**
+ * The job execution result.
+ */
public class JobExecutionResultImpl implements JobExecutionResult {
+ /** Constant object for the success case. */
public static final JobExecutionResultImpl SUCCEEDED = new JobExecutionResultImpl(InternalJobState.SUCCEEDED, null, null);
+ /** Constant object for the cancelled case. */
public static final JobExecutionResultImpl CANCELLED = new JobExecutionResultImpl(InternalJobState.CANCELLED, null, null);
+ /** Constant object for the failed case. */
public static final JobExecutionResultImpl FAILED = new JobExecutionResultImpl(InternalJobState.FAILED, null, null);
+ /** The state of the execution. */
private final InternalJobState state;
+ /** Optional message. */
private final String message;
+ /** Optional retry delay. */
private final Long retryDelayInMs;
+ /**
+ * Create a new result
+ * @param state The result state
+ * @param message Optional Message
+ * @param retryDelayInMs Optional retry delay
+ */
public JobExecutionResultImpl(final InternalJobState state,
final String message,
final Long retryDelayInMs) {
@@ -40,6 +55,10 @@ public class JobExecutionResultImpl impl
this.retryDelayInMs = retryDelayInMs;
}
+ /**
+ * Get the internal state
+ * @return The state.
+ */
public InternalJobState getState() {
return this.state;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Mon Oct 20 13:58:56 2014
@@ -42,7 +42,6 @@ import org.apache.sling.discovery.Topolo
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.event.impl.EnvironmentComponent;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueConfigurationChangeListener;
import org.apache.sling.event.impl.jobs.tasks.CheckTopologyTask;
import org.apache.sling.event.impl.jobs.tasks.FindUnfinishedJobsTask;
import org.apache.sling.event.impl.jobs.tasks.UpgradeTask;
@@ -74,7 +73,7 @@ import org.slf4j.LoggerFactory;
@Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
})
-public class JobManagerConfiguration implements TopologyEventListener, QueueConfigurationChangeListener {
+public class JobManagerConfiguration implements TopologyEventListener, ConfigurationChangeListener {
/** Logger. */
private final Logger logger = LoggerFactory.getLogger("org.apache.sling.event.impl.jobs");
@@ -220,7 +219,7 @@ public class JobManagerConfiguration imp
*/
@Deactivate
protected void deactivate() {
- this.queueConfigManager.removeListener(this);
+ this.queueConfigManager.removeListener();
}
/**
@@ -422,8 +421,12 @@ public class JobManagerConfiguration imp
return (slash ? this.scheduledJobsPathWithSlash : this.scheduledJobsPath);
}
+ /**
+ * This method is invoked by the queue configuration manager
+ * whenever the queue configuration changes.
+ */
@Override
- public void configChanged() {
+ public void configurationChanged(final boolean active) {
final TopologyCapabilities caps = this.topologyCapabilities;
if ( caps != null ) {
synchronized ( this.listeners ) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Mon Oct 20 13:58:56 2014
@@ -40,10 +40,6 @@ import org.apache.sling.event.impl.suppo
bind="bindConfig", unbind="unbindConfig", updated="updateConfig")
public class QueueConfigurationManager {
- public interface QueueConfigurationChangeListener {
- void configChanged();
- }
-
/** Empty configuration array. */
private static final InternalQueueConfiguration[] EMPTY_CONFIGS = new InternalQueueConfiguration[0];
@@ -57,8 +53,8 @@ public class QueueConfigurationManager {
@Reference
private MainQueueConfiguration mainQueueConfiguration;
- /** Listeners. */
- private final List<QueueConfigurationChangeListener> listeners = new ArrayList<QueueConfigurationChangeListener>();
+ /** Listener - this is the job manager configuration component. */
+ private volatile ConfigurationChangeListener changeListener;
/**
* Add a new queue configuration.
@@ -104,6 +100,7 @@ public class QueueConfigurationManager {
Collections.sort(configurations);
orderedConfigs = configurations.toArray(new InternalQueueConfiguration[configurations.size()]);
}
+ this.updateListener();
}
/**
@@ -174,23 +171,28 @@ public class QueueConfigurationManager {
return result;
}
- public void addListener(final QueueConfigurationChangeListener listener) {
- synchronized ( this.listeners ) {
- this.listeners.add(listener);
- }
+ /**
+ * Add a config listener.
+ * @param listener
+ */
+ public void addListener(final ConfigurationChangeListener listener) {
+ this.changeListener = listener;
}
- public void removeListener(final QueueConfigurationChangeListener listener) {
- synchronized ( this.listeners ) {
- this.listeners.remove(listener);
- }
+ /**
+ * Remove the config listener.
+ */
+ public void removeListener() {
+ this.changeListener = null;
}
- private void updateListeners() {
- synchronized ( listeners ) {
- for(final QueueConfigurationChangeListener l : listeners) {
- l.configChanged();
- }
+ /**
+ * Update the listener.
+ */
+ private void updateListener() {
+ final ConfigurationChangeListener l = this.changeListener;
+ if ( l != null ) {
+ l.configurationChanged(true);
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java Mon Oct 20 13:58:56 2014
@@ -26,9 +26,6 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.sling.api.SlingConstants;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
@@ -98,20 +95,24 @@ public class NewJobSender implements Eve
final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
final String rt = (String) event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
if ( ResourceHelper.RESOURCE_TYPE_JOB.equals(rt) && this.configuration.isLocalJob(path) ) {
- // read the job
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource rsrc = resolver.getResource(path);
- if ( rsrc != null ) {
- final Job job = Utility.readJob(this.logger, rsrc);
- if ( job != null ) {
- logger.debug("Sending job added event for {}", job);
- NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_ADDED, job, null);
- }
- }
- } finally {
- resolver.close();
- }
+ // get topic and id from path
+ final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
+ final int topicEnd = path.indexOf('/', topicStart);
+ final String topic = path.substring(topicStart, topicEnd).replace('.', '/');
+ final String jobId = path.substring(topicEnd + 1);
+
+ // only job id and topic are guaranteed
+ final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+ properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, jobId);
+ properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, topic);
+
+ // we also set internally the queue name
+ final String queueName = this.configuration.getQueueConfigurationManager().getQueueInfo(topic).queueName;
+ properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queueName);
+
+ final Event jobEvent = new Event(NotificationConstants.TOPIC_JOB_ADDED, properties);
+ // as this is send within handling an event, we do sync call
+ this.eventAdmin.sendEvent(jobEvent);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java Mon Oct 20 13:58:56 2014
@@ -99,11 +99,7 @@ public abstract class NotificationUtilit
if ( time != null ) {
eventProps.put(PROPERTY_TIME, time);
}
- if ( NotificationConstants.TOPIC_JOB_ADDED.equals(eventTopic) ) {
- eventAdmin.sendEvent(new Event(eventTopic, eventProps));
- } else {
- eventAdmin.postEvent(new Event(eventTopic, eventProps));
- }
+ eventAdmin.postEvent(new Event(eventTopic, eventProps));
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java Mon Oct 20 13:58:56 2014
@@ -84,7 +84,6 @@ public abstract class NotificationConsta
* Asynchronous notification event when a job is added.
* The property {@link #NOTIFICATION_PROPERTY_JOB_TOPIC} contains the job topic,
* the property {@link #NOTIFICATION_PROPERTY_JOB_ID} contains the unique job id.
- * The payload of the job is available as additional job specific properties.
* @since 1.6
*/
public static final String TOPIC_JOB_ADDED = "org/apache/sling/event/notification/job/ADDED";
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java Mon Oct 20 13:58:56 2014
@@ -25,11 +25,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.sling.discovery.TopologyEvent;
@@ -266,7 +268,8 @@ public class ChaosTest extends AbstractJ
final TopologyView view = views.get(0);
try {
- final ServiceReference[] refs = this.bc.getServiceReferences(TopologyEventListener.class.getName(), "(objectClass=org.apache.sling.event.impl.jobs.config.JobManagerConfiguration)");
+ final ServiceReference[] refs = this.bc.getServiceReferences(TopologyEventListener.class.getName(),
+ "(objectClass=org.apache.sling.event.impl.jobs.config.JobManagerConfiguration)");
assertNotNull(refs);
assertEquals(1, refs.length);
final TopologyEventListener tel = (TopologyEventListener)bc.getService(refs[0]);
@@ -309,21 +312,27 @@ public class ChaosTest extends AbstractJ
public void testDoChaos() throws Exception {
final JobManager jobManager = this.getJobManager();
- // setup created map
+ // setup added, created and finished map
+ // added and finished are filled by notifications
+ // created is filled by the threads starting jobs
+ final Map<String, AtomicLong> added = new HashMap<String, AtomicLong>();
final Map<String, AtomicLong> created = new HashMap<String, AtomicLong>();
final Map<String, AtomicLong> finished = new HashMap<String, AtomicLong>();
final List<String> topics = new ArrayList<String>();
for(int i=0;i<NUM_ORDERED_TOPICS;i++) {
+ added.put(ORDERED_TOPICS[i], new AtomicLong());
created.put(ORDERED_TOPICS[i], new AtomicLong());
finished.put(ORDERED_TOPICS[i], new AtomicLong());
topics.add(ORDERED_TOPICS[i]);
}
for(int i=0;i<NUM_PARALLEL_TOPICS;i++) {
+ added.put(PARALLEL_TOPICS[i], new AtomicLong());
created.put(PARALLEL_TOPICS[i], new AtomicLong());
finished.put(PARALLEL_TOPICS[i], new AtomicLong());
topics.add(PARALLEL_TOPICS[i]);
}
for(int i=0;i<NUM_ROUND_TOPICS;i++) {
+ added.put(ROUND_TOPICS[i], new AtomicLong());
created.put(ROUND_TOPICS[i], new AtomicLong());
finished.put(ROUND_TOPICS[i], new AtomicLong());
topics.add(ROUND_TOPICS[i]);
@@ -338,9 +347,11 @@ public class ChaosTest extends AbstractJ
@Override
public void handleEvent(final Event event) {
+ final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
if ( NotificationConstants.TOPIC_JOB_FINISHED.equals(event.getTopic())) {
- final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
finished.get(topic).incrementAndGet();
+ } else if ( NotificationConstants.TOPIC_JOB_ADDED.equals(event.getTopic())) {
+ added.get(topic).incrementAndGet();
}
}
});
@@ -370,8 +381,9 @@ public class ChaosTest extends AbstractJ
}
System.out.println("Waiting for job handling to finish...");
- while ( !topics.isEmpty() ) {
- final Iterator<String> iter = topics.iterator();
+ final Set<String> allTopics = new HashSet<String>(topics);
+ while ( !allTopics.isEmpty() ) {
+ final Iterator<String> iter = allTopics.iterator();
while ( iter.hasNext() ) {
final String topic = iter.next();
if ( finished.get(topic).get() == created.get(topic).get() ) {
@@ -380,6 +392,11 @@ public class ChaosTest extends AbstractJ
}
this.sleep(100);
}
+ System.out.println("Checking notifications...");
+ for(final String topic : topics) {
+ assertEquals("Checking topic " + topic, created.get(topic).get(), added.get(topic).get());
+ }
+
} finally {
eventHandler.unregister();
for(final ServiceRegistration reg : registrations) {