You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2011/12/07 15:08:33 UTC
svn commit: r1211442 -
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
Author: cziegeler
Date: Wed Dec 7 14:08:32 2011
New Revision: 1211442
URL: http://svn.apache.org/viewvc?rev=1211442&view=rev
Log:
SLING-2323 : OrderedQueue: remove all might leave a stale entry
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1211442&r1=1211441&r2=1211442&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Dec 7 14:08:32 2011
@@ -38,13 +38,10 @@ import org.apache.sling.event.jobs.JobUt
public final class OrderedJobQueue extends AbstractJobQueue {
/** The job event for rescheduling. */
- private JobEvent jobEvent;
+ private volatile JobEvent jobEvent;
- /** Marker indicating that this queue is currently sleeping. */
- private volatile long isSleepingUntil = -1;
-
- /** The sleeping thread. */
- private volatile Thread sleepingThread;
+ /** Lock and status object for handling the sleep phase. */
+ private final SleepLock sleepLock = new SleepLock();
/** The queue. */
private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
@@ -59,7 +56,7 @@ public final class OrderedJobQueue exten
@Override
public String getStateInfo() {
- return super.getStateInfo() + ", isSleepingUntil=" + this.isSleepingUntil;
+ return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince;
}
@Override
@@ -86,28 +83,20 @@ public final class OrderedJobQueue exten
return rescheduleInfo;
}
- private void setNotSleeping() {
- this.isSleepingUntil = -1;
- this.sleepingThread = null;
- }
-
- private void setSleeping(final Thread sleepingThread, final long delay) {
- this.sleepingThread = sleepingThread;
- this.isSleepingUntil = System.currentTimeMillis() + delay;
- }
-
- private void wakeUp() {
- if ( this.isSleepingUntil != -1 ) {
- final Thread thread = this.sleepingThread;
- if ( thread != null ) {
- thread.interrupt();
+ private void wakeUp(final boolean discardJob) {
+ synchronized ( this.sleepLock ) {
+ if ( this.sleepLock.sleepingSince != -1 ) {
+ if ( discardJob ) {
+ this.sleepLock.jobEvent = null;
+ }
+ this.sleepLock.notify();
}
}
}
@Override
public void resume() {
- this.wakeUp();
+ this.wakeUp(false);
super.resume();
}
@@ -156,14 +145,23 @@ public final class OrderedJobQueue exten
delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
}
if ( delay > 0 ) {
- this.setSleeping(Thread.currentThread(), delay);
- try {
+ synchronized ( this.sleepLock ) {
+ this.sleepLock.sleepingSince = System.currentTimeMillis();
+ this.sleepLock.jobEvent = info;
this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- this.ignoreException(e);
- } finally {
- this.setNotSleeping();
+ try {
+ this.sleepLock.wait(delay);
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ this.sleepLock.sleepingSince = -1;
+ final JobEvent result = this.sleepLock.jobEvent;
+ this.sleepLock.jobEvent = null;
+
+ if ( result == null ) {
+ info.remove();
+ }
+ return result;
}
}
return info;
@@ -179,9 +177,10 @@ public final class OrderedJobQueue exten
@Override
public synchronized void removeAll() {
- this.jobEvent = null;
- this.wakeUp();
+ // remove all remaining jobs first
super.removeAll();
+ this.jobEvent = null;
+ this.wakeUp(true);
}
@Override
@@ -194,9 +193,18 @@ public final class OrderedJobQueue exten
@Override
public Object getState(final String key) {
if ( "isSleepingUntil".equals(key) ) {
- return this.isSleepingUntil;
+ return this.sleepLock.sleepingSince;
}
return super.getState(key);
}
+
+ private static final class SleepLock {
+
+ /** Marker indicating that this queue is currently sleeping. */
+ public volatile long sleepingSince = -1;
+
+ /** The job event to be returned after sleeping. */
+ public volatile JobEvent jobEvent;
+ }
}