You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/05/03 10:50:11 UTC

svn commit: r1478672 - /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java

Author: cziegeler
Date: Fri May  3 08:50:11 2013
New Revision: 1478672

URL: http://svn.apache.org/r1478672
Log:
SLING-2829 : Don't close queue if async jobs are running

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1478672&r1=1478671&r2=1478672&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri May  3 08:50:11 2013
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.event.EventUtil;
@@ -99,6 +100,9 @@ public abstract class AbstractJobQueue
     /** Suspend lock. */
     private final Object suspendLock = new Object();
 
+    /** Async counter. */
+    private final AtomicInteger asyncCounter = new AtomicInteger();
+
     /**
      * Start this queue
      * @param name The queue name
@@ -123,7 +127,10 @@ public abstract class AbstractJobQueue
     @Override
     public String getStateInfo() {
         synchronized ( this.suspendLock ) {
-            return "isWaiting=" + this.isWaiting + ", markedForRemoval=" + this.markedForRemoval + ", suspendedSince=" + this.suspendedSince;
+            return "isWaiting=" + this.isWaiting +
+                    ", markedForRemoval=" + this.markedForRemoval +
+                    ", suspendedSince=" + this.suspendedSince +
+                    ", asyncJobs=" + this.asyncCounter.get();
         }
     }
 
@@ -391,7 +398,7 @@ public abstract class AbstractJobQueue
     }
 
     protected boolean canBeMarkedForRemoval() {
-        return this.isEmpty() && !this.isWaiting &&!this.isSuspended();
+        return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
     }
 
     /**
@@ -529,10 +536,10 @@ public abstract class AbstractJobQueue
                                                     if ( !asyncDone.get() ) {
                                                         asyncDone.set(true);
                                                         finishedJob(job.getId(), result, true);
+                                                        asyncCounter.decrementAndGet();
                                                     } else {
                                                         throw new IllegalStateException("Job is already marked as processed");
                                                     }
-                                                    asyncLock.notify();
                                                 }
                                             }
 
@@ -566,6 +573,7 @@ public abstract class AbstractJobQueue
                                     }
                                 }
                                 if ( result == JobConsumer.JobResult.ASYNC ) {
+                                    asyncCounter.incrementAndGet();
                                     notifyFinished(null);
                                 }
                             }