You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/08 16:42:46 UTC
svn commit: r1530297 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
JobConsumerManager.java queues/AbstractJobQueue.java
Author: cziegeler
Date: Tue Oct 8 14:42:46 2013
New Revision: 1530297
URL: http://svn.apache.org/r1530297
Log:
SLING-3041 : Mark job as failed if async job consumer disappears
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Tue Oct 8 14:42:46 2013
@@ -97,6 +97,8 @@ public class JobConsumerManager {
private BundleContext bundleContext;
+ private final Map<String, Object[]> listenerMap = new HashMap<String, Object[]>();
+
private Dictionary<String, Object> getRegistrationProperties() {
final Dictionary<String, Object> serviceProps = new Hashtable<String, Object>();
serviceProps.put(PropertyProvider.PROPERTY_PROPERTIES, TopologyCapabilities.PROPERTY_TOPICS);
@@ -162,6 +164,7 @@ public class JobConsumerManager {
this.bundleContext = null;
synchronized ( this.topicToConsumerMap ) {
this.topicToConsumerMap.clear();
+ this.listenerMap.clear();
}
}
@@ -188,6 +191,18 @@ public class JobConsumerManager {
return null;
}
+ public void registerListener(final String key, final JobExecutor consumer, final JobExecutionContext handler) {
+ synchronized ( this.topicToConsumerMap ) {
+ this.listenerMap.put(key, new Object[] {consumer, handler});
+ }
+ }
+
+ public void unregisterListener(final String key) {
+ synchronized ( this.topicToConsumerMap ) {
+ this.listenerMap.remove(key);
+ }
+ }
+
/**
* Return the topics information of this instance.
*/
@@ -289,6 +304,17 @@ public class JobConsumerManager {
if ( topic.length() > 0 ) {
final List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
if ( consumers != null ) { // sanity check
+ for(final ConsumerInfo oldConsumer : consumers) {
+ if ( oldConsumer.equals(info) && oldConsumer.executor != null ) {
+ // notify listener
+ for(final Object[] listenerObjects : this.listenerMap.values()) {
+ if ( listenerObjects[0] == oldConsumer.executor ) {
+ ((JobExecutionContext)listenerObjects[1]).asyncProcessingFinished(JobStatus.FAILED);
+ break;
+ }
+ }
+ }
+ }
consumers.remove(info);
if ( consumers.size() == 0 ) {
this.topicToConsumerMap.remove(topic);
@@ -358,7 +384,7 @@ public class JobConsumerManager {
public final ServiceReference serviceReference;
private final boolean isConsumer;
- private JobExecutor executor;
+ public JobExecutor executor;
public final int ranking;
public final long serviceId;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1530297&r1=1530296&r2=1530297&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Tue Oct 8 14:42:46 2013
@@ -544,7 +544,7 @@ public abstract class AbstractJobQueue
try {
synchronized ( lock ) {
- result = consumer.process(job, new JobExecutionContext() {
+ final JobExecutionContext ctx = new JobExecutionContext() {
private boolean hasInit = false;
@@ -585,6 +585,7 @@ public abstract class AbstractJobQueue
public void asyncProcessingFinished(final JobStatus status) {
synchronized ( lock ) {
if ( isAsync.compareAndSet(true, false) ) {
+ jobConsumerManager.unregisterListener(job.getId());
finishedJob(job.getId(), status, true);
asyncCounter.decrementAndGet();
} else {
@@ -592,8 +593,10 @@ public abstract class AbstractJobQueue
}
}
}
- });
+ };
+ result = consumer.process(job, ctx);
if ( result == null ) { // ASYNC processing
+ jobConsumerManager.registerListener(job.getId(), consumer, ctx);
asyncCounter.incrementAndGet();
notifyFinished(null);
isAsync.set(true);