You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/08 16:42:46 UTC

svn commit: r1530297 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: JobConsumerManager.java queues/AbstractJobQueue.java

Author: cziegeler
Date: Tue Oct  8 14:42:46 2013
New Revision: 1530297

URL: http://svn.apache.org/r1530297
Log:
SLING-3041 : Mark job as failed if async job consumer disappears

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Tue Oct  8 14:42:46 2013
@@ -97,6 +97,8 @@ public class JobConsumerManager {
 
     private BundleContext bundleContext;
 
+    private final Map<String, Object[]> listenerMap = new HashMap<String, Object[]>();
+
     private Dictionary<String, Object> getRegistrationProperties() {
         final Dictionary<String, Object> serviceProps = new Hashtable<String, Object>();
         serviceProps.put(PropertyProvider.PROPERTY_PROPERTIES, TopologyCapabilities.PROPERTY_TOPICS);
@@ -162,6 +164,7 @@ public class JobConsumerManager {
         this.bundleContext = null;
         synchronized ( this.topicToConsumerMap ) {
             this.topicToConsumerMap.clear();
+            this.listenerMap.clear();
         }
     }
 
@@ -188,6 +191,18 @@ public class JobConsumerManager {
         return null;
     }
 
+    public void registerListener(final String key, final JobExecutor consumer, final JobExecutionContext handler) {
+        synchronized ( this.topicToConsumerMap ) {
+            this.listenerMap.put(key, new Object[] {consumer, handler});
+        }
+    }
+
+    public void unregisterListener(final String key) {
+        synchronized ( this.topicToConsumerMap ) {
+            this.listenerMap.remove(key);
+        }
+    }
+
     /**
      * Return the topics information of this instance.
      */
@@ -289,6 +304,17 @@ public class JobConsumerManager {
                         if ( topic.length() > 0 ) {
                             final List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
                             if ( consumers != null ) { // sanity check
+                                for(final ConsumerInfo oldConsumer : consumers) {
+                                    if ( oldConsumer.equals(info) && oldConsumer.executor != null ) {
+                                        // notify listener
+                                        for(final Object[] listenerObjects : this.listenerMap.values()) {
+                                            if ( listenerObjects[0] == oldConsumer.executor ) {
+                                                ((JobExecutionContext)listenerObjects[1]).asyncProcessingFinished(JobStatus.FAILED);
+                                                break;
+                                            }
+                                        }
+                                    }
+                                }
                                 consumers.remove(info);
                                 if ( consumers.size() == 0 ) {
                                     this.topicToConsumerMap.remove(topic);
@@ -358,7 +384,7 @@ public class JobConsumerManager {
 
         public final ServiceReference serviceReference;
         private final boolean isConsumer;
-        private JobExecutor executor;
+        public JobExecutor executor;
         public final int ranking;
         public final long serviceId;
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Tue Oct  8 14:42:46 2013
@@ -544,7 +544,7 @@ public abstract class AbstractJobQueue
 
                                 try {
                                     synchronized ( lock ) {
-                                        result = consumer.process(job, new JobExecutionContext() {
+                                        final JobExecutionContext ctx = new JobExecutionContext() {
 
                                             private boolean hasInit = false;
 
@@ -585,6 +585,7 @@ public abstract class AbstractJobQueue
                                             public void asyncProcessingFinished(final JobStatus status) {
                                                 synchronized ( lock ) {
                                                     if ( isAsync.compareAndSet(true, false) ) {
+                                                        jobConsumerManager.unregisterListener(job.getId());
                                                         finishedJob(job.getId(), status, true);
                                                         asyncCounter.decrementAndGet();
                                                     } else {
@@ -592,8 +593,10 @@ public abstract class AbstractJobQueue
                                                     }
                                                 }
                                             }
-                                        });
+                                        };
+                                        result = consumer.process(job, ctx);
                                         if ( result == null ) { // ASYNC processing
+                                            jobConsumerManager.registerListener(job.getId(), consumer, ctx);
                                             asyncCounter.incrementAndGet();
                                             notifyFinished(null);
                                             isAsync.set(true);