You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/23 18:25:03 UTC

svn commit: r1633871 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: config/JobManagerConfiguration.java tasks/CheckTopologyTask.java tasks/FindUnfinishedJobsTask.java tasks/UpgradeTask.java

Author: cziegeler
Date: Thu Oct 23 16:25:03 2014
New Revision: 1633871

URL: http://svn.apache.org/r1633871
Log:
SLING-4096 : Jobs might stay unprocessed when topology changes

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Thu Oct 23 16:25:03 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.event.impl.jobs.config;
 
+import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ import org.apache.sling.api.resource.Per
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.discovery.TopologyEvent;
 import org.apache.sling.discovery.TopologyEvent.Type;
 import org.apache.sling.discovery.TopologyEventListener;
@@ -157,6 +159,9 @@ public class JobManagerConfiguration imp
     @Reference
     private QueueConfigurationManager queueConfigManager;
 
+    @Reference
+    private Scheduler scheduler;
+
     /** The topology capabilities. */
     private volatile TopologyCapabilities topologyCapabilities;
 
@@ -437,7 +442,12 @@ public class JobManagerConfiguration imp
         }
     }
 
+    /**
+     * Stop processing
+     * @param deactivate Whether to deactivate the capabilities
+     */
     private void stopProcessing(final boolean deactivate) {
+        logger.debug("Stopping job processing...");
         boolean notify = this.topologyCapabilities != null;
         // deactivate old capabilities - this stops all background processes
         if ( deactivate && this.topologyCapabilities != null ) {
@@ -449,26 +459,47 @@ public class JobManagerConfiguration imp
             // stop all listeners
             this.notifiyListeners();
         }
+        logger.debug("Job processing stopped");
     }
 
+    /**
+     * Start processing
+     * @param eventType The event type
+     * @param newCaps The new capabilities
+     * @param isConfigChange If a configuration change occured.
+     */
     private void startProcessing(final Type eventType, final TopologyCapabilities newCaps, final boolean isConfigChange) {
+        logger.debug("Starting job processing...");
         // create new capabilities and update view
         this.topologyCapabilities = newCaps;
 
         // before we propagate the new topology we do some maintenance
         if ( eventType == Type.TOPOLOGY_INIT ) {
-            final UpgradeTask task = new UpgradeTask();
-            task.run(this, this.topologyCapabilities);
+            final UpgradeTask task = new UpgradeTask(this);
+            task.run();
 
-            final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
-            rt.run(this);
+            final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask(this);
+            rt.run();
         }
 
+        // we run the checker task twice, now and shortly after the topology has changed.
         final CheckTopologyTask mt = new CheckTopologyTask(this);
-        mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
+        mt.fullRun(!isConfigChange, isConfigChange);
 
         // start listeners
         this.notifiyListeners();
+
+        // and run checker again in 15 seconds (if leader)
+        if ( this.topologyCapabilities.isLeader() ) {
+            scheduler.schedule(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        mt.assignUnassignedJobs();
+                    }
+                }, scheduler.AT(new Date(System.currentTimeMillis() + 15000)));
+        }
+        logger.debug("Job processing started");
     }
 
     private void notifiyListeners() {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Thu Oct 23 16:25:03 2014
@@ -55,19 +55,23 @@ public class CheckTopologyTask {
     /** Job manager configuration. */
     private final JobManagerConfiguration configuration;
 
+    /** The capabilities. */
+    private final TopologyCapabilities caps;
+
     /**
      * Constructor
+     * @param The configuration
      */
     public CheckTopologyTask(final JobManagerConfiguration config) {
         this.configuration = config;
+        this.caps = this.configuration.getTopologyCapabilities();
     }
 
     /**
      * Reassign jobs from stopped instance.
-     * @param caps Current topology capabilities.
      */
-    private void reassignJobsFromStoppedInstances(final TopologyCapabilities caps) {
-        if ( caps != null && caps.isLeader() && caps.isActive() ) {
+    private void reassignJobsFromStoppedInstances() {
+        if ( caps.isLeader() && caps.isActive() ) {
             this.logger.debug("Checking for stopped instances...");
             final ResourceResolver resolver = this.configuration.createResourceResolver();
             try {
@@ -83,7 +87,7 @@ public class CheckTopologyTask {
                         final String instanceId = instanceResource.getName();
                         if ( !caps.isActive(instanceId) ) {
                             logger.debug("Found stopped instance {}", instanceId);
-                            assignJobs(caps, instanceResource, true);
+                            assignJobs(instanceResource, true);
                         }
                     }
                 }
@@ -95,10 +99,9 @@ public class CheckTopologyTask {
 
     /**
      * Reassign stale jobs from this instance
-     * @param caps Current topology capabilities.
      */
-    private void reassignStableJobs(final TopologyCapabilities caps) {
-        if ( caps != null && caps.isActive() ) {
+    private void reassignStaleJobs() {
+        if ( caps.isActive() ) {
             this.logger.debug("Checking for stale jobs...");
             final ResourceResolver resolver = this.configuration.createResourceResolver();
             try {
@@ -160,14 +163,14 @@ public class CheckTopologyTask {
                                                 resolver.delete(rsrc);
                                                 resolver.commit();
                                             } catch ( final PersistenceException pe ) {
-                                                ignoreException(pe);
+                                                logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
                                                 resolver.refresh();
                                                 resolver.revert();
                                             }
                                         }
                                     } catch (final InstantiationException ie) {
                                         // something happened with the resource in the meantime
-                                        ignoreException(ie);
+                                        logger.warn("Unable to move stale job from " + rsrc.getPath(), ie);
                                         resolver.refresh();
                                         resolver.revert();
                                     }
@@ -190,8 +193,8 @@ public class CheckTopologyTask {
      * - topology
      * - capabilities
      */
-    private void assignUnassignedJobs(final TopologyCapabilities caps) {
-        if ( caps != null && caps.isLeader() ) {
+    public void assignUnassignedJobs() {
+        if ( caps.isLeader() && caps.isActive() ) {
             logger.debug("Checking unassigned jobs...");
             final ResourceResolver resolver = this.configuration.createResourceResolver();
             try {
@@ -200,7 +203,7 @@ public class CheckTopologyTask {
 
                 // this resource should exist, but we check anyway
                 if ( unassignedRoot != null ) {
-                    assignJobs(caps, unassignedRoot, false);
+                    assignJobs(unassignedRoot, false);
                 }
             } finally {
                 resolver.close();
@@ -214,12 +217,10 @@ public class CheckTopologyTask {
     /**
      * Try to assign all jobs from the jobs root.
      * The jobs are stored by topic
-     * @param caps The topology capabilities
      * @param jobsRoot The root of the jobs
      * @param unassign Whether to unassign the job if no instance is found.
      */
-    private void assignJobs(final TopologyCapabilities caps,
-            final Resource jobsRoot,
+    private void assignJobs(final Resource jobsRoot,
             final boolean unassign) {
         final ResourceResolver resolver = jobsRoot.getResourceResolver();
 
@@ -255,14 +256,14 @@ public class CheckTopologyTask {
                                     resolver.delete(rsrc);
                                     resolver.commit();
                                 } catch ( final PersistenceException pe ) {
-                                    ignoreException(pe);
+                                    logger.warn("Unable to move unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
                                     resolver.refresh();
                                     resolver.revert();
                                 }
                             }
                         } catch (final InstantiationException ie) {
                             // something happened with the resource in the meantime
-                            ignoreException(ie);
+                            logger.warn("Unable to move unassigned job from " + rsrc.getPath(), ie);
                             resolver.refresh();
                             resolver.revert();
                         }
@@ -290,13 +291,13 @@ public class CheckTopologyTask {
                                 resolver.delete(rsrc);
                                 resolver.commit();
                             } catch ( final PersistenceException pe ) {
-                                ignoreException(pe);
+                                logger.warn("Unable to unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
                                 resolver.refresh();
                                 resolver.revert();
                             }
                         } catch (final InstantiationException ie) {
                             // something happened with the resource in the meantime
-                            ignoreException(ie);
+                            logger.warn("Unable to unassigned job from " + rsrc.getPath(), ie);
                             resolver.refresh();
                             resolver.revert();
                         }
@@ -310,30 +311,16 @@ public class CheckTopologyTask {
     /**
      * One maintenance run
      */
-    public void run(final TopologyCapabilities topologyCapabilities,
-            final boolean topologyChanged,
-            final boolean configChanged) {
+    public void fullRun(final boolean topologyChanged,
+                        final boolean configChanged) {
         // if topology changed, reschedule assigned jobs for stopped instances
         if ( topologyChanged ) {
-            this.reassignJobsFromStoppedInstances(topologyCapabilities);
+            this.reassignJobsFromStoppedInstances();
         }
         // check for all topics
-        if ( topologyChanged || configChanged ) {
-            this.reassignStableJobs(topologyCapabilities);
-        }
-        // try to assign unassigned jobs
-        if ( topologyChanged || configChanged ) {
-            this.assignUnassignedJobs(topologyCapabilities);
-        }
-    }
+        this.reassignStaleJobs();
 
-    /**
-     * Helper method which just logs the exception in debug mode.
-     * @param e
-     */
-    private void ignoreException(final Exception e) {
-        if ( this.logger.isDebugEnabled() ) {
-            this.logger.debug("Ignored exception " + e.getMessage(), e);
-        }
+        // try to assign unassigned jobs
+        this.assignUnassignedJobs();
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java Thu Oct 23 16:25:03 2014
@@ -40,14 +40,25 @@ public class FindUnfinishedJobsTask {
     /** Logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    public void run(final JobManagerConfiguration configuration) {
-        this.initialScan(configuration);
+    /** Job manager configuration. */
+    private final JobManagerConfiguration configuration;
+
+    /**
+     * Constructor
+     * @param The configuration
+     */
+    public FindUnfinishedJobsTask(final JobManagerConfiguration config) {
+        this.configuration = config;
+    }
+
+    public void run() {
+        this.initialScan();
     }
 
     /**
      * Scan the resource tree for unfinished jobs from previous runs
      */
-    private void initialScan(final JobManagerConfiguration configuration) {
+    private void initialScan() {
         logger.debug("Scanning repository for unfinished jobs...");
         final ResourceResolver resolver = configuration.createResourceResolver();
         try {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java Thu Oct 23 16:25:03 2014
@@ -50,15 +50,29 @@ public class UpgradeTask {
     /** Logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
+    /** Job manager configuration. */
+    private final JobManagerConfiguration configuration;
+
+    /** The capabilities. */
+    private final TopologyCapabilities caps;
+
+    /**
+     * Constructor
+     * @param The configuration
+     */
+    public UpgradeTask(final JobManagerConfiguration config) {
+        this.configuration = config;
+        this.caps = this.configuration.getTopologyCapabilities();
+    }
+
     /**
      * Upgrade
      */
-    public void run(final JobManagerConfiguration configuration,
-            final TopologyCapabilities topologyCapabilities) {
-        if ( topologyCapabilities.isLeader() ) {
-            this.processJobsFromPreviousVersions(configuration, topologyCapabilities);
+    public void run() {
+        if ( caps.isLeader() ) {
+            this.processJobsFromPreviousVersions();
         }
-        this.upgradeBridgedJobs(configuration, topologyCapabilities);
+        this.upgradeBridgedJobs();
     }
 
     /**
@@ -66,19 +80,18 @@ public class UpgradeTask {
      * In previous versions, bridged jobs were stored under a special topic.
      * This has changed, the jobs are now stored with their real topic.
      */
-    private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
-            final TopologyCapabilities caps) {
+    private void upgradeBridgedJobs() {
         final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
         final ResourceResolver resolver = configuration.createResourceResolver();
         try {
             final Resource rootResource = resolver.getResource(path);
             if ( rootResource != null ) {
-                upgradeBridgedJobs(configuration, rootResource, caps);
+                upgradeBridgedJobs(rootResource);
             }
             if ( caps.isLeader() ) {
                 final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
                 if ( unassignedRoot != null ) {
-                    upgradeBridgedJobs(configuration, unassignedRoot, caps);
+                    upgradeBridgedJobs(unassignedRoot);
                 }
             }
         } finally {
@@ -87,14 +100,10 @@ public class UpgradeTask {
     }
 
     /**
-     * Upgrade bridge jobs
+     * Upgrade bridged jobs
      * @param rootResource  The root resource (topic resource)
-     * @param topologyCapabilities The capabilities
-     * @param queueManager The queue manager
      */
-    private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
-            final Resource topicResource,
-            final TopologyCapabilities caps) {
+    private void upgradeBridgedJobs(final Resource topicResource) {
         final String topicName = topicResource.getName().replace('.', '/');
         final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topicName);
         JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@@ -139,12 +148,11 @@ public class UpgradeTask {
     /**
      * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
      */
-    private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
-            final TopologyCapabilities caps) {
+    private void processJobsFromPreviousVersions() {
         final ResourceResolver resolver = configuration.createResourceResolver();
         try {
-            this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionAnonPath()));
-            this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+            this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath()));
+            this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
         } catch ( final PersistenceException pe ) {
             this.logger.warn("Problems moving jobs from previous version.", pe);
         } finally {
@@ -155,15 +163,13 @@ public class UpgradeTask {
     /**
      * Recursively find jobs and move them
      */
-    private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
-            final TopologyCapabilities caps,
-            final Resource rsrc) throws PersistenceException {
+    private void processJobsFromPreviousVersions(final Resource rsrc) throws PersistenceException {
         if ( rsrc != null && caps.isActive() ) {
             if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
-                this.moveJobFromPreviousVersion(configuration, caps, rsrc);
+                this.moveJobFromPreviousVersion(rsrc);
             } else {
                 for(final Resource child : rsrc.getChildren()) {
-                    this.processJobsFromPreviousVersions(configuration, caps, child);
+                    this.processJobsFromPreviousVersions(child);
                 }
                 if ( caps.isActive() ) {
                     rsrc.getResourceResolver().delete(rsrc);
@@ -177,9 +183,7 @@ public class UpgradeTask {
     /**
      * Move a single job
      */
-    private void moveJobFromPreviousVersion(final JobManagerConfiguration configuration,
-            final TopologyCapabilities caps,
-            final Resource jobResource)
+    private void moveJobFromPreviousVersion(final Resource jobResource)
     throws PersistenceException {
         final ResourceResolver resolver = jobResource.getResourceResolver();