You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/28 14:30:13 UTC

svn commit: r1682229 - /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java

Author: cziegeler
Date: Thu May 28 12:30:12 2015
New Revision: 1682229

URL: http://svn.apache.org/r1682229
Log:
SLING-4759 : Queue might be closed while job is waiting for retry

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1682229&r1=1682228&r2=1682229&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Thu May 28 12:30:12 2015
@@ -107,6 +107,9 @@ public class JobQueueImpl
     /** A marker for doing a full cache search. */
     private final AtomicBoolean doFullCacheSearch = new AtomicBoolean(false);
 
+    /** A counter for rescheduling. */
+    private final AtomicInteger waitCounter = new AtomicInteger();
+
     /** The job cache. */
     private final QueueJobCache cache;
 
@@ -444,6 +447,7 @@ public class JobQueueImpl
     private boolean canBeClosed() {
         return !this.isSuspended()
             && this.asyncCounter.get() == 0
+            && this.waitCounter.get() == 0
             && this.available.availablePermits() == this.configuration.getMaxParallel();
     }
 
@@ -736,6 +740,7 @@ public class JobQueueImpl
         return "outdated=" + this.isOutdated.get() +
                 ", suspendedSince=" + this.suspendedSince.get() +
                 ", asyncJobs=" + this.asyncCounter.get() +
+                ", waitCount=" + this.waitCounter.get() +
                 ", jobCount=" + String.valueOf(this.configuration.getMaxParallel() - this.available.availablePermits() +
                 (this.configuration.getType() == Type.ORDERED ? ", isSleepingUntil=" + this.isSleepingUntil : ""));
     }
@@ -798,6 +803,7 @@ public class JobQueueImpl
                         if ( handler.removeFromRetryList() ) {
                             requeue(handler);
                         }
+                        waitCounter.decrementAndGet();
                     } finally {
                         if ( configuration.getType() == Type.ORDERED ) {
                             isSleepingUntil = -1;
@@ -807,6 +813,7 @@ public class JobQueueImpl
                     }
                 }
             };
+            this.waitCounter.incrementAndGet();
             if ( !services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName)) ) {
                 // if scheduling fails run the thread directly
                 t.run();