You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/06/05 17:55:39 UTC

svn commit: r1489934 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: ./ queues/ stats/

Author: cziegeler
Date: Wed Jun  5 15:55:39 2013
New Revision: 1489934

URL: http://svn.apache.org/r1489934
Log:
SLING-2906 :  Queue might be outdated and closed while still processing 

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Jun  5 15:55:39 2013
@@ -205,8 +205,6 @@ public class JobManagerImpl
         final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
         while ( i.hasNext() ) {
             final AbstractJobQueue jbq = i.next();
-            // update mbeans
-            ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
             jbq.close();
             // update mbeans
             ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
@@ -244,19 +242,14 @@ public class JobManagerImpl
                 while ( i.hasNext() ) {
                     final Map.Entry<String, AbstractJobQueue> current = i.next();
                     final AbstractJobQueue jbq = current.getValue();
-                    if ( jbq.isMarkedForRemoval() ) {
-                        logger.debug("Removing idle Job Queue {}", jbq);
-                        // close
-                        jbq.close();
+                    if ( jbq.tryToClose() ) {
+                        logger.debug("Removing idle job queue {}", jbq);
                         // copy statistics
                         this.baseStatistics.add(jbq);
                         // remove
                         i.remove();
                         // update mbeans
                         ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
-                    } else {
-                        // mark to be removed during next cycle
-                        jbq.markForRemoval();
                     }
                 }
             }
@@ -377,10 +370,7 @@ public class JobManagerImpl
         // remove the queue with the old name
         this.queues.remove(queue.getName());
         // check if we can close or have to rename
-        queue.markForRemoval();
-        if ( queue.isMarkedForRemoval() ) {
-            // close
-            queue.close();
+        if ( queue.tryToClose() ) {
             // copy statistics
             this.baseStatistics.add(queue);
             // update mbeans
@@ -388,7 +378,12 @@ public class JobManagerImpl
         } else {
             queue.outdate();
             // readd with new name
-            this.queues.put(queue.getName(), queue);
+            String newName = queue.getName();
+            int index = 0;
+            while ( this.queues.containsKey(newName) ) {
+                newName = queue.getName() + '$' + String.valueOf(index++);
+            }
+            this.queues.put(newName, queue);
             // update mbeans
             ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue));
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Jun  5 15:55:39 2013
@@ -82,9 +82,6 @@ public abstract class AbstractJobQueue
     /** Are we still running? */
     protected volatile boolean running;
 
-    /** Are we marked for removal */
-    private volatile boolean markedForRemoval = false;
-
     /** Is the queue currently waiting(sleeping) */
     protected volatile boolean isWaiting = false;
 
@@ -131,7 +128,6 @@ public abstract class AbstractJobQueue
     public String getStateInfo() {
         synchronized ( this.suspendLock ) {
             return "isWaiting=" + this.isWaiting +
-                    ", markedForRemoval=" + this.markedForRemoval +
                     ", suspendedSince=" + this.suspendedSince +
                     ", asyncJobs=" + this.asyncCounter.get();
         }
@@ -151,7 +147,7 @@ public abstract class AbstractJobQueue
 
                     try {
                         runJobQueue();
-                    } catch (Throwable t) { //NOSONAR
+                    } catch (final Throwable t) { //NOSONAR
                         logger.error("Job queue " + queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
                     }
                 }
@@ -176,7 +172,6 @@ public abstract class AbstractJobQueue
     public void close() {
         this.running = false;
         this.logger.debug("Shutting down job queue {}", queueName);
-        this.logger.debug("Waking up sleeping queue {}", queueName);
         this.resume();
         if ( this.isWaiting ) {
             this.logger.debug("Waking up waiting queue {}", this.queueName);
@@ -195,6 +190,24 @@ public abstract class AbstractJobQueue
     }
 
     /**
+     * Check if the queue can be closed
+     */
+    public boolean tryToClose() {
+        // resume the queue as we want to close it!
+        this.resume();
+        // check if possible
+        if ( this.canBeClosed() ) {
+            this.close();
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean canBeClosed() {
+        return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
+    }
+
+    /**
      * Periodically check for started jobs without an acknowledge.
      */
     public void checkForUnprocessedJobs() {
@@ -400,32 +413,6 @@ public abstract class AbstractJobQueue
         notifyFinished(reprocessInfo);
     }
 
-    protected boolean canBeMarkedForRemoval() {
-        return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
-    }
-
-    /**
-     * Mark this queue for removal.
-     */
-    public void markForRemoval() {
-        if ( this.canBeMarkedForRemoval() ) {
-            this.markedForRemoval = true;
-        }
-    }
-
-    /**
-     * Check if this queue is marked for removal
-     */
-    public boolean isMarkedForRemoval() {
-        if ( this.markedForRemoval ) {
-            if ( this.canBeMarkedForRemoval() ) {
-                return true;
-            }
-            this.markedForRemoval = false;
-        }
-        return false;
-    }
-
     /**
      * Get the name of the job queue.
      */
@@ -691,6 +678,7 @@ public abstract class AbstractJobQueue
     public void resume() {
         synchronized ( this.suspendLock ) {
             if ( this.suspendedSince != -1 ) {
+                this.logger.debug("Waking up suspended queue {}", queueName);
                 this.suspendedSince = -1;
                 this.suspendLock.notify();
             }
@@ -704,6 +692,7 @@ public abstract class AbstractJobQueue
     public void suspend() {
         synchronized ( this.suspendLock ) {
             if ( this.suspendedSince == -1 ) {
+                this.logger.debug("Suspending queue {}", queueName);
                 this.suspendedSince = System.currentTimeMillis();
             }
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Wed Jun  5 15:55:39 2013
@@ -106,8 +106,8 @@ public abstract class AbstractParallelJo
     }
 
     @Override
-    protected boolean canBeMarkedForRemoval() {
-        boolean result = super.canBeMarkedForRemoval();
+    protected boolean canBeClosed() {
+        boolean result = super.canBeClosed();
         if ( result ) {
             result = this.jobCount == 0;
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Jun  5 15:55:39 2013
@@ -46,6 +46,8 @@ public final class OrderedJobQueue exten
     /** Lock and status object for handling the sleep phase. */
     private final SleepLock sleepLock = new SleepLock();
 
+    private boolean isWaitingForNext = false;
+
     /** The queue - we use a set which is sorted by job creation date. */
     private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>() {
 
@@ -75,7 +77,16 @@ public final class OrderedJobQueue exten
 
     @Override
     public String getStateInfo() {
-        return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince;
+        return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince + ", isWaitingForNext=" + this.isWaitingForNext;
+    }
+
+    @Override
+    protected boolean canBeClosed() {
+        boolean result = super.canBeClosed();
+        if ( result ) {
+            result = this.isWaitingForNext;
+        }
+        return result;
     }
 
     @Override
@@ -124,6 +135,7 @@ public final class OrderedJobQueue exten
         synchronized ( this.queue ) {
             this.queue.add(handler);
             this.queue.notify();
+            this.isWaitingForNext = false;
         }
     }
 
@@ -131,6 +143,7 @@ public final class OrderedJobQueue exten
     protected JobHandler take() {
         synchronized ( this.queue ) {
             while ( this.queue.isEmpty() ) {
+                this.isWaitingForNext = true;
                 try {
                     this.queue.wait();
                 } catch (final InterruptedException e) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Wed Jun  5 15:55:39 2013
@@ -39,6 +39,8 @@ public final class ParallelJobQueue exte
     /** The queue. */
     private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>();
 
+    private boolean isWaitingForNext = false;
+
     public ParallelJobQueue(final String name,
                            final InternalQueueConfiguration config,
                            final JobConsumerManager jobConsumerManager,
@@ -48,8 +50,23 @@ public final class ParallelJobQueue exte
     }
 
     @Override
+    public String getStateInfo() {
+        return super.getStateInfo() + ", isWaitingForNext=" + this.isWaitingForNext;
+    }
+
+    @Override
+    protected boolean canBeClosed() {
+        boolean result = super.canBeClosed();
+        if ( result ) {
+            result = this.isWaitingForNext;
+        }
+        return result;
+    }
+
+    @Override
     protected void put(final JobHandler event) {
         try {
+            this.isWaitingForNext = false;
             this.queue.put(event);
         } catch (final InterruptedException e) {
             // this should never happen
@@ -60,10 +77,13 @@ public final class ParallelJobQueue exte
     @Override
     protected JobHandler take() {
         try {
+            this.isWaitingForNext = true;
             return this.queue.take();
         } catch (final InterruptedException e) {
             // this should never happen
             this.ignoreException(e);
+        } finally {
+            this.isWaitingForNext = false;
         }
         return null;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Wed Jun  5 15:55:39 2013
@@ -66,8 +66,8 @@ public final class TopicRoundRobinJobQue
     }
 
     @Override
-    protected boolean canBeMarkedForRemoval() {
-        boolean result = super.canBeMarkedForRemoval();
+    protected boolean canBeClosed() {
+        boolean result = super.canBeClosed();
         if ( result ) {
             result = this.isWaitingForNext;
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java Wed Jun  5 15:55:39 2013
@@ -188,7 +188,7 @@ public class StatisticsImpl implements S
     }
 
     /**
-     * New job in the qeue
+     * New job in the queue
      */
     public synchronized void incQueued() {
         this.queuedJobs++;