You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/28 20:21:29 UTC
svn commit: r1682303 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
config/JobManagerConfiguration.java queues/JobQueueImpl.java
queues/QueueManager.java
Author: cziegeler
Date: Thu May 28 18:21:28 2015
New Revision: 1682303
URL: http://svn.apache.org/r1682303
Log:
SLING-4759 : Queue might be closed while job is waiting for retry
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1682303&r1=1682302&r2=1682303&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Thu May 28 18:21:28 2015
@@ -501,25 +501,29 @@ public class JobManagerConfiguration imp
final CheckTopologyTask mt = new CheckTopologyTask(this);
mt.fullRun(!isConfigChange, isConfigChange);
- // and run checker again in some seconds (if leader)
- // notify listeners afterwards
- final Scheduler local = this.scheduler;
- if ( local != null ) {
- local.schedule(new Runnable() {
+ if ( eventType == Type.TOPOLOGY_INIT ) {
+ notifiyListeners();
+ } else {
+ // and run checker again in some seconds (if leader)
+ // notify listeners afterwards
+ final Scheduler local = this.scheduler;
+ if ( local != null ) {
+ local.schedule(new Runnable() {
- @Override
- public void run() {
- if ( newCaps.isLeader() && newCaps.isActive() ) {
- mt.assignUnassignedJobs();
- }
- // start listeners
- synchronized ( listeners ) {
- if ( topologyCapabilities != null && newCaps.isActive() ) {
- notifiyListeners();
+ @Override
+ public void run() {
+ if ( newCaps.isLeader() && newCaps.isActive() ) {
+ mt.assignUnassignedJobs();
+ }
+ // start listeners
+ if ( newCaps.isActive() ) {
+ synchronized ( listeners ) {
+ notifiyListeners();
+ }
}
}
- }
- }, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000)));
+ }, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000)));
+ }
}
logger.debug("Job processing started");
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1682303&r1=1682302&r2=1682303&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Thu May 28 18:21:28 2015
@@ -143,7 +143,7 @@ public class JobQueueImpl
if ( cache.isEmpty() ) {
return null;
}
- return new JobQueueImpl(name, config, services, topics, cache);
+ return new JobQueueImpl(name, config, services, cache);
}
/**
@@ -152,13 +152,11 @@ public class JobQueueImpl
* @param name The queue name
* @param config The queue configuration
* @param services The queue services
- * @param topics The topics handled by this queue
* @param cache The job cache
*/
private JobQueueImpl(final String name,
final InternalQueueConfiguration config,
final QueueServices services,
- final Set<String> topics,
final QueueJobCache cache) {
if ( config.getOwnThreadPoolSize() > 0 ) {
this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
@@ -492,7 +490,6 @@ public class JobQueueImpl
*/
public void wakeUpQueue(final Set<String> topics) {
this.cache.handleNewTopics(topics);
- this.startJobs();
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1682303&r1=1682302&r2=1682303&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Thu May 28 18:21:28 2015
@@ -175,12 +175,14 @@ public class QueueManager
logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
// queue maintenance
- for(final JobQueueImpl jbq : this.queues.values() ) {
- jbq.maintain();
+ if ( this.isActive.get() ) {
+ for(final JobQueueImpl jbq : this.queues.values() ) {
+ jbq.maintain();
+ }
}
// full topic scan is done every third run
- if ( schedulerRuns % 3 == 0 ) {
+ if ( schedulerRuns % 3 == 0 && this.isActive.get() ) {
this.fullTopicScan();
}
@@ -244,11 +246,10 @@ public class QueueManager
}
}
if ( queue != null ) {
- if ( isNewQueue ) {
- queue.startJobs();
- } else {
+ if ( !isNewQueue ) {
queue.wakeUpQueue(topics);
}
+ queue.startJobs();
}
}