You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/28 14:30:13 UTC
svn commit: r1682229 -
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
Author: cziegeler
Date: Thu May 28 12:30:12 2015
New Revision: 1682229
URL: http://svn.apache.org/r1682229
Log:
SLING-4759 : Queue might be closed while job is waiting for retry
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1682229&r1=1682228&r2=1682229&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Thu May 28 12:30:12 2015
@@ -107,6 +107,9 @@ public class JobQueueImpl
/** A marker for doing a full cache search. */
private final AtomicBoolean doFullCacheSearch = new AtomicBoolean(false);
+ /** A counter for rescheduling. */
+ private final AtomicInteger waitCounter = new AtomicInteger();
+
/** The job cache. */
private final QueueJobCache cache;
@@ -444,6 +447,7 @@ public class JobQueueImpl
private boolean canBeClosed() {
return !this.isSuspended()
&& this.asyncCounter.get() == 0
+ && this.waitCounter.get() == 0
&& this.available.availablePermits() == this.configuration.getMaxParallel();
}
@@ -736,6 +740,7 @@ public class JobQueueImpl
return "outdated=" + this.isOutdated.get() +
", suspendedSince=" + this.suspendedSince.get() +
", asyncJobs=" + this.asyncCounter.get() +
+ ", waitCount=" + this.waitCounter.get() +
", jobCount=" + String.valueOf(this.configuration.getMaxParallel() - this.available.availablePermits() +
(this.configuration.getType() == Type.ORDERED ? ", isSleepingUntil=" + this.isSleepingUntil : ""));
}
@@ -798,6 +803,7 @@ public class JobQueueImpl
if ( handler.removeFromRetryList() ) {
requeue(handler);
}
+ waitCounter.decrementAndGet();
} finally {
if ( configuration.getType() == Type.ORDERED ) {
isSleepingUntil = -1;
@@ -807,6 +813,7 @@ public class JobQueueImpl
}
}
};
+ this.waitCounter.incrementAndGet();
if ( !services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName)) ) {
// if scheduling fails run the thread directly
t.run();