You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2011/12/07 15:08:33 UTC

svn commit: r1211442 - /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java

Author: cziegeler
Date: Wed Dec  7 14:08:32 2011
New Revision: 1211442

URL: http://svn.apache.org/viewvc?rev=1211442&view=rev
Log:
SLING-2323 : OrderedQueue: remove all might leave a stale entry

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1211442&r1=1211441&r2=1211442&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Dec  7 14:08:32 2011
@@ -38,13 +38,10 @@ import org.apache.sling.event.jobs.JobUt
 public final class OrderedJobQueue extends AbstractJobQueue {
 
     /** The job event for rescheduling. */
-    private JobEvent jobEvent;
+    private volatile JobEvent jobEvent;
 
-    /** Marker indicating that this queue is currently sleeping. */
-    private volatile long isSleepingUntil = -1;
-
-    /** The sleeping thread. */
-    private volatile Thread sleepingThread;
+    /** Lock and status object for handling the sleep phase. */
+    private final SleepLock sleepLock = new SleepLock();
 
     /** The queue. */
     private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
@@ -59,7 +56,7 @@ public final class OrderedJobQueue exten
 
     @Override
     public String getStateInfo() {
-        return super.getStateInfo() + ", isSleepingUntil=" + this.isSleepingUntil;
+        return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince;
     }
 
     @Override
@@ -86,28 +83,20 @@ public final class OrderedJobQueue exten
         return rescheduleInfo;
     }
 
-    private void setNotSleeping() {
-        this.isSleepingUntil = -1;
-        this.sleepingThread = null;
-    }
-
-    private void setSleeping(final Thread sleepingThread, final long delay) {
-        this.sleepingThread = sleepingThread;
-        this.isSleepingUntil = System.currentTimeMillis() + delay;
-    }
-
-    private void wakeUp() {
-        if ( this.isSleepingUntil != -1 ) {
-            final Thread thread = this.sleepingThread;
-            if ( thread != null ) {
-                thread.interrupt();
+    private void wakeUp(final boolean discardJob) {
+        synchronized ( this.sleepLock ) {
+            if ( this.sleepLock.sleepingSince != -1 ) {
+                if ( discardJob ) {
+                    this.sleepLock.jobEvent = null;
+                }
+                this.sleepLock.notify();
             }
         }
     }
 
     @Override
     public void resume() {
-        this.wakeUp();
+        this.wakeUp(false);
         super.resume();
     }
 
@@ -156,14 +145,23 @@ public final class OrderedJobQueue exten
             delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
         }
         if ( delay > 0 ) {
-            this.setSleeping(Thread.currentThread(), delay);
-            try {
+            synchronized ( this.sleepLock ) {
+                this.sleepLock.sleepingSince = System.currentTimeMillis();
+                this.sleepLock.jobEvent = info;
                 this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
-                Thread.sleep(delay);
-            } catch (InterruptedException e) {
-                this.ignoreException(e);
-            } finally {
-                this.setNotSleeping();
+                try {
+                    this.sleepLock.wait(delay);
+                } catch (final InterruptedException e) {
+                    this.ignoreException(e);
+                }
+                this.sleepLock.sleepingSince = -1;
+                final JobEvent result = this.sleepLock.jobEvent;
+                this.sleepLock.jobEvent = null;
+
+                if ( result == null ) {
+                    info.remove();
+                }
+                return result;
             }
         }
         return info;
@@ -179,9 +177,10 @@ public final class OrderedJobQueue exten
 
     @Override
     public synchronized void removeAll() {
-        this.jobEvent = null;
-        this.wakeUp();
+        // remove all remaining jobs first
         super.removeAll();
+        this.jobEvent = null;
+        this.wakeUp(true);
     }
 
     @Override
@@ -194,9 +193,18 @@ public final class OrderedJobQueue exten
     @Override
     public Object getState(final String key) {
         if ( "isSleepingUntil".equals(key) ) {
-            return this.isSleepingUntil;
+            return this.sleepLock.sleepingSince;
         }
         return super.getState(key);
     }
+
+    private static final class SleepLock {
+
+        /** Marker indicating that this queue is currently sleeping. */
+        public volatile long sleepingSince = -1;
+
+        /** The job event to be returned after sleeping. */
+        public volatile JobEvent jobEvent;
+    }
 }