You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/06 17:24:23 UTC
svn commit: r1678026 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues:
JobQueueImpl.java QueueJobCache.java
Author: cziegeler
Date: Wed May 6 15:24:23 2015
New Revision: 1678026
URL: http://svn.apache.org/r1678026
Log:
SLING-4679 : Perform periodically a full topic scan
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1678026&r1=1678025&r2=1678026&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Wed May 6 15:24:23 2015
@@ -119,6 +119,7 @@ public class JobQueueImpl
/** Lock for close/start. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /** Sleeping until is only set for ordered queues if a job is rescheduled. */
private volatile long isSleepingUntil = -1;
/**
@@ -198,15 +199,9 @@ public class JobQueueImpl
/**
* Start the job queue.
- */
- public void startJobs() {
- startJobs(false);
- }
- /**
- * Start the job queue.
* This method might be called concurrently, therefore we use a guard
*/
- private void startJobs(boolean justOne) {
+ public void startJobs() {
if ( this.startJobsGuard.compareAndSet(false, true) ) {
// we start as many jobs in parallel as possible
while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
@@ -245,12 +240,9 @@ public class JobQueueImpl
currentThread.setName(oldName);
}
// and try to launch another job
- startJobs(true);
+ startJobs();
}
});
- if ( justOne ) {
- break;
- }
} else {
// no job available, stop look
break;
@@ -794,23 +786,31 @@ public class JobQueueImpl
handler.addToRetryList();
final Date fireDate = new Date();
fireDate.setTime(System.currentTimeMillis() + delay);
- this.isSleepingUntil = fireDate.getTime();
+ if ( this.configuration.getType() == Type.ORDERED ) {
+ this.isSleepingUntil = fireDate.getTime();
+ }
final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
final Runnable t = new Runnable() {
@Override
public void run() {
- isSleepingUntil = -1;
- if ( handler.removeFromRetryList() ) {
- requeue(handler);
- }
- if ( configuration.getType() == Type.ORDERED ) {
- cache.setIsBlocked(false);
+ try {
+ if ( handler.removeFromRetryList() ) {
+ requeue(handler);
+ }
+ } finally {
+ if ( configuration.getType() == Type.ORDERED ) {
+ isSleepingUntil = -1;
+ cache.setIsBlocked(false);
+ startJobs();
+ }
}
- startJobs();
}
};
- services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName));
+ if ( !services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName)) ) {
+ // if scheduling fails run the thread directly
+ t.run();
+ }
} else {
// put directly into queue
this.requeue(handler);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1678026&r1=1678025&r2=1678026&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Wed May 6 15:24:23 2015
@@ -88,7 +88,6 @@ public class QueueJobCache {
this.configuration = configuration;
this.queueType = queueType;
this.topics = new ConcurrentSkipListSet<String>(topics);
- this.topicsWithNewJobs.addAll(topics);
this.fillCache(queueName, statisticsManager);
}