You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/28 20:21:29 UTC

svn commit: r1682303 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: config/JobManagerConfiguration.java queues/JobQueueImpl.java queues/QueueManager.java

Author: cziegeler
Date: Thu May 28 18:21:28 2015
New Revision: 1682303

URL: http://svn.apache.org/r1682303
Log:
SLING-4759 : Queue might be closed while job is waiting for retry

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1682303&r1=1682302&r2=1682303&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Thu May 28 18:21:28 2015
@@ -501,25 +501,29 @@ public class JobManagerConfiguration imp
         final CheckTopologyTask mt = new CheckTopologyTask(this);
         mt.fullRun(!isConfigChange, isConfigChange);
 
-        // and run checker again in some seconds (if leader)
-        // notify listeners afterwards
-        final Scheduler local = this.scheduler;
-        if ( local != null ) {
-            local.schedule(new Runnable() {
+        if ( eventType == Type.TOPOLOGY_INIT ) {
+            notifiyListeners();
+        } else {
+            // and run checker again in some seconds (if leader)
+            // notify listeners afterwards
+            final Scheduler local = this.scheduler;
+            if ( local != null ) {
+                local.schedule(new Runnable() {
 
-                    @Override
-                    public void run() {
-                        if ( newCaps.isLeader() && newCaps.isActive() ) {
-                            mt.assignUnassignedJobs();
-                        }
-                        // start listeners
-                        synchronized ( listeners ) {
-                            if ( topologyCapabilities != null && newCaps.isActive() ) {
-                                notifiyListeners();
+                        @Override
+                        public void run() {
+                            if ( newCaps.isLeader() && newCaps.isActive() ) {
+                                mt.assignUnassignedJobs();
+                            }
+                            // start listeners
+                            if ( newCaps.isActive() ) {
+                                synchronized ( listeners ) {
+                                    notifiyListeners();
+                                }
                             }
                         }
-                    }
-                }, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000)));
+                    }, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000)));
+            }
         }
         logger.debug("Job processing started");
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1682303&r1=1682302&r2=1682303&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Thu May 28 18:21:28 2015
@@ -143,7 +143,7 @@ public class JobQueueImpl
         if ( cache.isEmpty() ) {
             return null;
         }
-        return new JobQueueImpl(name, config, services, topics, cache);
+        return new JobQueueImpl(name, config, services, cache);
     }
 
     /**
@@ -152,13 +152,11 @@ public class JobQueueImpl
      * @param name The queue name
      * @param config The queue configuration
      * @param services The queue services
-     * @param topics The topics handled by this queue
      * @param cache The job cache
      */
     private JobQueueImpl(final String name,
                         final InternalQueueConfiguration config,
                         final QueueServices services,
-                        final Set<String> topics,
                         final QueueJobCache cache) {
         if ( config.getOwnThreadPoolSize() > 0 ) {
             this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
@@ -492,7 +490,6 @@ public class JobQueueImpl
      */
     public void wakeUpQueue(final Set<String> topics) {
         this.cache.handleNewTopics(topics);
-        this.startJobs();
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1682303&r1=1682302&r2=1682303&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Thu May 28 18:21:28 2015
@@ -175,12 +175,14 @@ public class QueueManager
         logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
 
         // queue maintenance
-        for(final JobQueueImpl jbq : this.queues.values() ) {
-            jbq.maintain();
+        if ( this.isActive.get() ) {
+            for(final JobQueueImpl jbq : this.queues.values() ) {
+                jbq.maintain();
+            }
         }
 
         // full topic scan is done every third run
-        if ( schedulerRuns % 3 == 0 ) {
+        if ( schedulerRuns % 3 == 0 && this.isActive.get() ) {
             this.fullTopicScan();
         }
 
@@ -244,11 +246,10 @@ public class QueueManager
             }
         }
         if ( queue != null ) {
-            if ( isNewQueue ) {
-                queue.startJobs();
-            } else {
+            if ( !isNewQueue ) {
                 queue.wakeUpQueue(topics);
             }
+            queue.startJobs();
         }
     }