You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/06 17:24:23 UTC

svn commit: r1678026 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues: JobQueueImpl.java QueueJobCache.java

Author: cziegeler
Date: Wed May  6 15:24:23 2015
New Revision: 1678026

URL: http://svn.apache.org/r1678026
Log:
SLING-4679 : Perform periodically a full topic scan 

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1678026&r1=1678025&r2=1678026&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Wed May  6 15:24:23 2015
@@ -119,6 +119,7 @@ public class JobQueueImpl
     /** Lock for close/start. */
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    /** Sleeping until is only set for ordered queues if a job is rescheduled. */
     private volatile long isSleepingUntil = -1;
 
     /**
@@ -198,15 +199,9 @@ public class JobQueueImpl
 
     /**
      * Start the job queue.
-     */
-    public void startJobs() {
-        startJobs(false);
-    }
-    /**
-     * Start the job queue.
      * This method might be called concurrently, therefore we use a guard
      */
-    private void startJobs(boolean justOne) {
+    public void startJobs() {
         if ( this.startJobsGuard.compareAndSet(false, true) ) {
             // we start as many jobs in parallel as possible
             while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
@@ -245,12 +240,9 @@ public class JobQueueImpl
                                     currentThread.setName(oldName);
                                 }
                                 // and try to launch another job
-                                startJobs(true);
+                                startJobs();
                             }
                         });
-                        if ( justOne ) {
-                            break;
-                        }
                     } else {
                         // no job available, stop look
                         break;
@@ -794,23 +786,31 @@ public class JobQueueImpl
             handler.addToRetryList();
             final Date fireDate = new Date();
             fireDate.setTime(System.currentTimeMillis() + delay);
-            this.isSleepingUntil = fireDate.getTime();
+            if ( this.configuration.getType() == Type.ORDERED ) {
+                this.isSleepingUntil = fireDate.getTime();
+            }
 
             final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
             final Runnable t = new Runnable() {
                 @Override
                 public void run() {
-                    isSleepingUntil = -1;
-                    if ( handler.removeFromRetryList() ) {
-                        requeue(handler);
-                    }
-                    if ( configuration.getType() == Type.ORDERED ) {
-                        cache.setIsBlocked(false);
+                    try {
+                        if ( handler.removeFromRetryList() ) {
+                            requeue(handler);
+                        }
+                    } finally {
+                        if ( configuration.getType() == Type.ORDERED ) {
+                            isSleepingUntil = -1;
+                            cache.setIsBlocked(false);
+                            startJobs();
+                        }
                     }
-                    startJobs();
                 }
             };
-            services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName));
+            if ( !services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName)) ) {
+                // if scheduling fails run the thread directly
+                t.run();
+            }
         } else {
             // put directly into queue
             this.requeue(handler);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1678026&r1=1678025&r2=1678026&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Wed May  6 15:24:23 2015
@@ -88,7 +88,6 @@ public class QueueJobCache {
         this.configuration = configuration;
         this.queueType = queueType;
         this.topics = new ConcurrentSkipListSet<String>(topics);
-        this.topicsWithNewJobs.addAll(topics);
         this.fillCache(queueName, statisticsManager);
     }