You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/20 15:58:56 UTC

svn commit: r1633159 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/config/ main/java/org/apache/sling/event/impl/jobs/notifications/ main/java/org/apache/sling/eve...

Author: cziegeler
Date: Mon Oct 20 13:58:56 2014
New Revision: 1633159

URL: http://svn.apache.org/r1633159
Log:
SLING-4065 : Add notification when a job is added. Calculate event properties from path instead of reading the job

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobExecutionResultImpl.java Mon Oct 20 13:58:56 2014
@@ -20,18 +20,33 @@ package org.apache.sling.event.impl.jobs
 
 import org.apache.sling.event.jobs.consumer.JobExecutionResult;
 
+/**
+ * The job execution result.
+ */
 public class JobExecutionResultImpl implements JobExecutionResult {
 
+    /** Constant object for the success case. */
     public static final JobExecutionResultImpl SUCCEEDED = new JobExecutionResultImpl(InternalJobState.SUCCEEDED, null, null);
+    /** Constant object for the cancelled case. */
     public static final JobExecutionResultImpl CANCELLED = new JobExecutionResultImpl(InternalJobState.CANCELLED, null, null);
+    /** Constant object for the failed case. */
     public static final JobExecutionResultImpl FAILED = new JobExecutionResultImpl(InternalJobState.FAILED, null, null);
 
+    /** The state of the execution. */
     private final InternalJobState state;
 
+    /** Optional message. */
     private final String message;
 
+    /** Optional retry delay. */
     private final Long retryDelayInMs;
 
+    /**
+     * Create a new result
+     * @param state The result state
+     * @param message Optional Message
+     * @param retryDelayInMs Optional retry delay
+     */
     public JobExecutionResultImpl(final InternalJobState state,
             final String message,
             final Long retryDelayInMs) {
@@ -40,6 +55,10 @@ public class JobExecutionResultImpl impl
         this.retryDelayInMs = retryDelayInMs;
     }
 
+    /**
+     * Get the internal state
+     * @return The state.
+     */
     public InternalJobState getState() {
         return this.state;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Mon Oct 20 13:58:56 2014
@@ -42,7 +42,6 @@ import org.apache.sling.discovery.Topolo
 import org.apache.sling.discovery.TopologyEvent.Type;
 import org.apache.sling.discovery.TopologyEventListener;
 import org.apache.sling.event.impl.EnvironmentComponent;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueConfigurationChangeListener;
 import org.apache.sling.event.impl.jobs.tasks.CheckTopologyTask;
 import org.apache.sling.event.impl.jobs.tasks.FindUnfinishedJobsTask;
 import org.apache.sling.event.impl.jobs.tasks.UpgradeTask;
@@ -74,7 +73,7 @@ import org.slf4j.LoggerFactory;
     @Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
               longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
 })
-public class JobManagerConfiguration implements TopologyEventListener, QueueConfigurationChangeListener {
+public class JobManagerConfiguration implements TopologyEventListener, ConfigurationChangeListener {
 
     /** Logger. */
     private final Logger logger = LoggerFactory.getLogger("org.apache.sling.event.impl.jobs");
@@ -220,7 +219,7 @@ public class JobManagerConfiguration imp
      */
     @Deactivate
     protected void deactivate() {
-        this.queueConfigManager.removeListener(this);
+        this.queueConfigManager.removeListener();
     }
 
     /**
@@ -422,8 +421,12 @@ public class JobManagerConfiguration imp
         return (slash ? this.scheduledJobsPathWithSlash : this.scheduledJobsPath);
     }
 
+    /**
+     * This method is invoked by the queue configuration manager
+     * whenever the queue configuration changes.
+     */
     @Override
-    public void configChanged() {
+    public void configurationChanged(final boolean active) {
         final TopologyCapabilities caps = this.topologyCapabilities;
         if ( caps != null ) {
             synchronized ( this.listeners ) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Mon Oct 20 13:58:56 2014
@@ -40,10 +40,6 @@ import org.apache.sling.event.impl.suppo
            bind="bindConfig", unbind="unbindConfig", updated="updateConfig")
 public class QueueConfigurationManager {
 
-    public interface QueueConfigurationChangeListener {
-        void configChanged();
-    }
-
     /** Empty configuration array. */
     private static final InternalQueueConfiguration[] EMPTY_CONFIGS = new InternalQueueConfiguration[0];
 
@@ -57,8 +53,8 @@ public class QueueConfigurationManager {
     @Reference
     private MainQueueConfiguration mainQueueConfiguration;
 
-    /** Listeners. */
-    private final List<QueueConfigurationChangeListener> listeners = new ArrayList<QueueConfigurationChangeListener>();
+    /** Listener - this is the job manager configuration component. */
+    private volatile ConfigurationChangeListener changeListener;
 
     /**
      * Add a new queue configuration.
@@ -104,6 +100,7 @@ public class QueueConfigurationManager {
             Collections.sort(configurations);
             orderedConfigs = configurations.toArray(new InternalQueueConfiguration[configurations.size()]);
         }
+        this.updateListener();
     }
 
     /**
@@ -174,23 +171,28 @@ public class QueueConfigurationManager {
         return result;
     }
 
-    public void addListener(final QueueConfigurationChangeListener listener) {
-        synchronized ( this.listeners ) {
-            this.listeners.add(listener);
-        }
+    /**
+     * Add a config listener.
+     * @param listener
+     */
+    public void addListener(final ConfigurationChangeListener listener) {
+        this.changeListener = listener;
     }
 
-    public void removeListener(final QueueConfigurationChangeListener listener) {
-        synchronized ( this.listeners ) {
-            this.listeners.remove(listener);
-        }
+    /**
+     * Remove the config listener.
+     */
+    public void removeListener() {
+        this.changeListener = null;
     }
 
-    private void updateListeners() {
-        synchronized ( listeners ) {
-            for(final QueueConfigurationChangeListener l : listeners) {
-                l.configChanged();
-            }
+    /**
+     * Update the listener.
+     */
+    private void updateListener() {
+        final ConfigurationChangeListener l = this.changeListener;
+        if ( l != null ) {
+            l.configurationChanged(true);
         }
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java Mon Oct 20 13:58:56 2014
@@ -26,9 +26,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.sling.api.SlingConstants;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
@@ -98,20 +95,24 @@ public class NewJobSender implements Eve
         final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
         final String rt = (String) event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
         if ( ResourceHelper.RESOURCE_TYPE_JOB.equals(rt) && this.configuration.isLocalJob(path) ) {
-            // read the job
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            try {
-                final Resource rsrc = resolver.getResource(path);
-                if ( rsrc != null ) {
-                    final Job job = Utility.readJob(this.logger, rsrc);
-                    if ( job != null ) {
-                        logger.debug("Sending job added event for {}", job);
-                        NotificationUtility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_ADDED, job, null);
-                    }
-                }
-            } finally {
-                resolver.close();
-            }
+            // get topic and id from path
+            final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
+            final int topicEnd = path.indexOf('/', topicStart);
+            final String topic = path.substring(topicStart, topicEnd).replace('.', '/');
+            final String jobId = path.substring(topicEnd + 1);
+
+            // only job id and topic are guaranteed
+            final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+            properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, jobId);
+            properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, topic);
+
+            // we also set internally the queue name
+            final String queueName = this.configuration.getQueueConfigurationManager().getQueueInfo(topic).queueName;
+            properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queueName);
+
+            final Event jobEvent = new Event(NotificationConstants.TOPIC_JOB_ADDED, properties);
+            // as this is send within handling an event, we do sync call
+            this.eventAdmin.sendEvent(jobEvent);
         }
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java Mon Oct 20 13:58:56 2014
@@ -99,11 +99,7 @@ public abstract class NotificationUtilit
         if ( time != null ) {
             eventProps.put(PROPERTY_TIME, time);
         }
-        if ( NotificationConstants.TOPIC_JOB_ADDED.equals(eventTopic) ) {
-            eventAdmin.sendEvent(new Event(eventTopic, eventProps));
-        } else {
-            eventAdmin.postEvent(new Event(eventTopic, eventProps));
-        }
+        eventAdmin.postEvent(new Event(eventTopic, eventProps));
     }
 
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/NotificationConstants.java Mon Oct 20 13:58:56 2014
@@ -84,7 +84,6 @@ public abstract class NotificationConsta
      * Asynchronous notification event when a job is added.
      * The property {@link #NOTIFICATION_PROPERTY_JOB_TOPIC} contains the job topic,
      * the property {@link #NOTIFICATION_PROPERTY_JOB_ID} contains the unique job id.
-     * The payload of the job is available as additional job specific properties.
      * @since 1.6
      */
     public static final String TOPIC_JOB_ADDED = "org/apache/sling/event/notification/job/ADDED";

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java?rev=1633159&r1=1633158&r2=1633159&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java Mon Oct 20 13:58:56 2014
@@ -25,11 +25,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.sling.discovery.TopologyEvent;
@@ -266,7 +268,8 @@ public class ChaosTest extends AbstractJ
         final TopologyView view = views.get(0);
 
         try {
-            final ServiceReference[] refs = this.bc.getServiceReferences(TopologyEventListener.class.getName(), "(objectClass=org.apache.sling.event.impl.jobs.config.JobManagerConfiguration)");
+            final ServiceReference[] refs = this.bc.getServiceReferences(TopologyEventListener.class.getName(),
+                    "(objectClass=org.apache.sling.event.impl.jobs.config.JobManagerConfiguration)");
             assertNotNull(refs);
             assertEquals(1, refs.length);
             final TopologyEventListener tel = (TopologyEventListener)bc.getService(refs[0]);
@@ -309,21 +312,27 @@ public class ChaosTest extends AbstractJ
     public void testDoChaos() throws Exception {
         final JobManager jobManager = this.getJobManager();
 
-        // setup created map
+        // setup added, created and finished map
+        // added and finished are filled by notifications
+        // created is filled by the threads starting jobs
+        final Map<String, AtomicLong> added = new HashMap<String, AtomicLong>();
         final Map<String, AtomicLong> created = new HashMap<String, AtomicLong>();
         final Map<String, AtomicLong> finished = new HashMap<String, AtomicLong>();
         final List<String> topics = new ArrayList<String>();
         for(int i=0;i<NUM_ORDERED_TOPICS;i++) {
+            added.put(ORDERED_TOPICS[i], new AtomicLong());
             created.put(ORDERED_TOPICS[i], new AtomicLong());
             finished.put(ORDERED_TOPICS[i], new AtomicLong());
             topics.add(ORDERED_TOPICS[i]);
         }
         for(int i=0;i<NUM_PARALLEL_TOPICS;i++) {
+            added.put(PARALLEL_TOPICS[i], new AtomicLong());
             created.put(PARALLEL_TOPICS[i], new AtomicLong());
             finished.put(PARALLEL_TOPICS[i], new AtomicLong());
             topics.add(PARALLEL_TOPICS[i]);
         }
         for(int i=0;i<NUM_ROUND_TOPICS;i++) {
+            added.put(ROUND_TOPICS[i], new AtomicLong());
             created.put(ROUND_TOPICS[i], new AtomicLong());
             finished.put(ROUND_TOPICS[i], new AtomicLong());
             topics.add(ROUND_TOPICS[i]);
@@ -338,9 +347,11 @@ public class ChaosTest extends AbstractJ
 
                     @Override
                     public void handleEvent(final Event event) {
+                        final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
                         if ( NotificationConstants.TOPIC_JOB_FINISHED.equals(event.getTopic())) {
-                            final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
                             finished.get(topic).incrementAndGet();
+                        } else if ( NotificationConstants.TOPIC_JOB_ADDED.equals(event.getTopic())) {
+                            added.get(topic).incrementAndGet();
                         }
                     }
                 });
@@ -370,8 +381,9 @@ public class ChaosTest extends AbstractJ
             }
 
             System.out.println("Waiting for job handling to finish...");
-            while ( !topics.isEmpty() ) {
-                final Iterator<String> iter = topics.iterator();
+            final Set<String> allTopics = new HashSet<String>(topics);
+            while ( !allTopics.isEmpty() ) {
+                final Iterator<String> iter = allTopics.iterator();
                 while ( iter.hasNext() ) {
                     final String topic = iter.next();
                     if ( finished.get(topic).get() == created.get(topic).get() ) {
@@ -380,6 +392,11 @@ public class ChaosTest extends AbstractJ
                 }
                 this.sleep(100);
             }
+            System.out.println("Checking notifications...");
+            for(final String topic : topics) {
+                assertEquals("Checking topic " + topic, created.get(topic).get(), added.get(topic).get());
+            }
+
         } finally {
             eventHandler.unregister();
             for(final ServiceRegistration reg : registrations) {