You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/23 18:25:03 UTC
svn commit: r1633871 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
config/JobManagerConfiguration.java tasks/CheckTopologyTask.java
tasks/FindUnfinishedJobsTask.java tasks/UpgradeTask.java
Author: cziegeler
Date: Thu Oct 23 16:25:03 2014
New Revision: 1633871
URL: http://svn.apache.org/r1633871
Log:
SLING-4096 : Jobs might stay unprocessed when topology changes
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Thu Oct 23 16:25:03 2014
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.impl.jobs.config;
+import java.sql.Date;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
@@ -38,6 +39,7 @@ import org.apache.sling.api.resource.Per
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
@@ -157,6 +159,9 @@ public class JobManagerConfiguration imp
@Reference
private QueueConfigurationManager queueConfigManager;
+ @Reference
+ private Scheduler scheduler;
+
/** The topology capabilities. */
private volatile TopologyCapabilities topologyCapabilities;
@@ -437,7 +442,12 @@ public class JobManagerConfiguration imp
}
}
+ /**
+ * Stop processing
+ * @param deactivate Whether to deactivate the capabilities
+ */
private void stopProcessing(final boolean deactivate) {
+ logger.debug("Stopping job processing...");
boolean notify = this.topologyCapabilities != null;
// deactivate old capabilities - this stops all background processes
if ( deactivate && this.topologyCapabilities != null ) {
@@ -449,26 +459,47 @@ public class JobManagerConfiguration imp
// stop all listeners
this.notifiyListeners();
}
+ logger.debug("Job processing stopped");
}
+ /**
+ * Start processing
+ * @param eventType The event type
+ * @param newCaps The new capabilities
+ * @param isConfigChange If a configuration change occured.
+ */
private void startProcessing(final Type eventType, final TopologyCapabilities newCaps, final boolean isConfigChange) {
+ logger.debug("Starting job processing...");
// create new capabilities and update view
this.topologyCapabilities = newCaps;
// before we propagate the new topology we do some maintenance
if ( eventType == Type.TOPOLOGY_INIT ) {
- final UpgradeTask task = new UpgradeTask();
- task.run(this, this.topologyCapabilities);
+ final UpgradeTask task = new UpgradeTask(this);
+ task.run();
- final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
- rt.run(this);
+ final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask(this);
+ rt.run();
}
+ // we run the checker task twice, now and shortly after the topology has changed.
final CheckTopologyTask mt = new CheckTopologyTask(this);
- mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
+ mt.fullRun(!isConfigChange, isConfigChange);
// start listeners
this.notifiyListeners();
+
+ // and run checker again in 15 seconds (if leader)
+ if ( this.topologyCapabilities.isLeader() ) {
+ scheduler.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ mt.assignUnassignedJobs();
+ }
+ }, scheduler.AT(new Date(System.currentTimeMillis() + 15000)));
+ }
+ logger.debug("Job processing started");
}
private void notifiyListeners() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Thu Oct 23 16:25:03 2014
@@ -55,19 +55,23 @@ public class CheckTopologyTask {
/** Job manager configuration. */
private final JobManagerConfiguration configuration;
+ /** The capabilities. */
+ private final TopologyCapabilities caps;
+
/**
* Constructor
+ * @param The configuration
*/
public CheckTopologyTask(final JobManagerConfiguration config) {
this.configuration = config;
+ this.caps = this.configuration.getTopologyCapabilities();
}
/**
* Reassign jobs from stopped instance.
- * @param caps Current topology capabilities.
*/
- private void reassignJobsFromStoppedInstances(final TopologyCapabilities caps) {
- if ( caps != null && caps.isLeader() && caps.isActive() ) {
+ private void reassignJobsFromStoppedInstances() {
+ if ( caps.isLeader() && caps.isActive() ) {
this.logger.debug("Checking for stopped instances...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
@@ -83,7 +87,7 @@ public class CheckTopologyTask {
final String instanceId = instanceResource.getName();
if ( !caps.isActive(instanceId) ) {
logger.debug("Found stopped instance {}", instanceId);
- assignJobs(caps, instanceResource, true);
+ assignJobs(instanceResource, true);
}
}
}
@@ -95,10 +99,9 @@ public class CheckTopologyTask {
/**
* Reassign stale jobs from this instance
- * @param caps Current topology capabilities.
*/
- private void reassignStableJobs(final TopologyCapabilities caps) {
- if ( caps != null && caps.isActive() ) {
+ private void reassignStaleJobs() {
+ if ( caps.isActive() ) {
this.logger.debug("Checking for stale jobs...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
@@ -160,14 +163,14 @@ public class CheckTopologyTask {
resolver.delete(rsrc);
resolver.commit();
} catch ( final PersistenceException pe ) {
- ignoreException(pe);
+ logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
resolver.refresh();
resolver.revert();
}
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
- ignoreException(ie);
+ logger.warn("Unable to move stale job from " + rsrc.getPath(), ie);
resolver.refresh();
resolver.revert();
}
@@ -190,8 +193,8 @@ public class CheckTopologyTask {
* - topology
* - capabilities
*/
- private void assignUnassignedJobs(final TopologyCapabilities caps) {
- if ( caps != null && caps.isLeader() ) {
+ public void assignUnassignedJobs() {
+ if ( caps.isLeader() && caps.isActive() ) {
logger.debug("Checking unassigned jobs...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
try {
@@ -200,7 +203,7 @@ public class CheckTopologyTask {
// this resource should exist, but we check anyway
if ( unassignedRoot != null ) {
- assignJobs(caps, unassignedRoot, false);
+ assignJobs(unassignedRoot, false);
}
} finally {
resolver.close();
@@ -214,12 +217,10 @@ public class CheckTopologyTask {
/**
* Try to assign all jobs from the jobs root.
* The jobs are stored by topic
- * @param caps The topology capabilities
* @param jobsRoot The root of the jobs
* @param unassign Whether to unassign the job if no instance is found.
*/
- private void assignJobs(final TopologyCapabilities caps,
- final Resource jobsRoot,
+ private void assignJobs(final Resource jobsRoot,
final boolean unassign) {
final ResourceResolver resolver = jobsRoot.getResourceResolver();
@@ -255,14 +256,14 @@ public class CheckTopologyTask {
resolver.delete(rsrc);
resolver.commit();
} catch ( final PersistenceException pe ) {
- ignoreException(pe);
+ logger.warn("Unable to move unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
resolver.refresh();
resolver.revert();
}
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
- ignoreException(ie);
+ logger.warn("Unable to move unassigned job from " + rsrc.getPath(), ie);
resolver.refresh();
resolver.revert();
}
@@ -290,13 +291,13 @@ public class CheckTopologyTask {
resolver.delete(rsrc);
resolver.commit();
} catch ( final PersistenceException pe ) {
- ignoreException(pe);
+ logger.warn("Unable to unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
resolver.refresh();
resolver.revert();
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
- ignoreException(ie);
+ logger.warn("Unable to unassigned job from " + rsrc.getPath(), ie);
resolver.refresh();
resolver.revert();
}
@@ -310,30 +311,16 @@ public class CheckTopologyTask {
/**
* One maintenance run
*/
- public void run(final TopologyCapabilities topologyCapabilities,
- final boolean topologyChanged,
- final boolean configChanged) {
+ public void fullRun(final boolean topologyChanged,
+ final boolean configChanged) {
// if topology changed, reschedule assigned jobs for stopped instances
if ( topologyChanged ) {
- this.reassignJobsFromStoppedInstances(topologyCapabilities);
+ this.reassignJobsFromStoppedInstances();
}
// check for all topics
- if ( topologyChanged || configChanged ) {
- this.reassignStableJobs(topologyCapabilities);
- }
- // try to assign unassigned jobs
- if ( topologyChanged || configChanged ) {
- this.assignUnassignedJobs(topologyCapabilities);
- }
- }
+ this.reassignStaleJobs();
- /**
- * Helper method which just logs the exception in debug mode.
- * @param e
- */
- private void ignoreException(final Exception e) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Ignored exception " + e.getMessage(), e);
- }
+ // try to assign unassigned jobs
+ this.assignUnassignedJobs();
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java Thu Oct 23 16:25:03 2014
@@ -40,14 +40,25 @@ public class FindUnfinishedJobsTask {
/** Logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- public void run(final JobManagerConfiguration configuration) {
- this.initialScan(configuration);
+ /** Job manager configuration. */
+ private final JobManagerConfiguration configuration;
+
+ /**
+ * Constructor
+ * @param The configuration
+ */
+ public FindUnfinishedJobsTask(final JobManagerConfiguration config) {
+ this.configuration = config;
+ }
+
+ public void run() {
+ this.initialScan();
}
/**
* Scan the resource tree for unfinished jobs from previous runs
*/
- private void initialScan(final JobManagerConfiguration configuration) {
+ private void initialScan() {
logger.debug("Scanning repository for unfinished jobs...");
final ResourceResolver resolver = configuration.createResourceResolver();
try {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java?rev=1633871&r1=1633870&r2=1633871&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java Thu Oct 23 16:25:03 2014
@@ -50,15 +50,29 @@ public class UpgradeTask {
/** Logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ /** Job manager configuration. */
+ private final JobManagerConfiguration configuration;
+
+ /** The capabilities. */
+ private final TopologyCapabilities caps;
+
+ /**
+ * Constructor
+ * @param The configuration
+ */
+ public UpgradeTask(final JobManagerConfiguration config) {
+ this.configuration = config;
+ this.caps = this.configuration.getTopologyCapabilities();
+ }
+
/**
* Upgrade
*/
- public void run(final JobManagerConfiguration configuration,
- final TopologyCapabilities topologyCapabilities) {
- if ( topologyCapabilities.isLeader() ) {
- this.processJobsFromPreviousVersions(configuration, topologyCapabilities);
+ public void run() {
+ if ( caps.isLeader() ) {
+ this.processJobsFromPreviousVersions();
}
- this.upgradeBridgedJobs(configuration, topologyCapabilities);
+ this.upgradeBridgedJobs();
}
/**
@@ -66,19 +80,18 @@ public class UpgradeTask {
* In previous versions, bridged jobs were stored under a special topic.
* This has changed, the jobs are now stored with their real topic.
*/
- private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
- final TopologyCapabilities caps) {
+ private void upgradeBridgedJobs() {
final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
final ResourceResolver resolver = configuration.createResourceResolver();
try {
final Resource rootResource = resolver.getResource(path);
if ( rootResource != null ) {
- upgradeBridgedJobs(configuration, rootResource, caps);
+ upgradeBridgedJobs(rootResource);
}
if ( caps.isLeader() ) {
final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
if ( unassignedRoot != null ) {
- upgradeBridgedJobs(configuration, unassignedRoot, caps);
+ upgradeBridgedJobs(unassignedRoot);
}
}
} finally {
@@ -87,14 +100,10 @@ public class UpgradeTask {
}
/**
- * Upgrade bridge jobs
+ * Upgrade bridged jobs
* @param rootResource The root resource (topic resource)
- * @param topologyCapabilities The capabilities
- * @param queueManager The queue manager
*/
- private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
- final Resource topicResource,
- final TopologyCapabilities caps) {
+ private void upgradeBridgedJobs(final Resource topicResource) {
final String topicName = topicResource.getName().replace('.', '/');
final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topicName);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@@ -139,12 +148,11 @@ public class UpgradeTask {
/**
* Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
*/
- private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
- final TopologyCapabilities caps) {
+ private void processJobsFromPreviousVersions() {
final ResourceResolver resolver = configuration.createResourceResolver();
try {
- this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionAnonPath()));
- this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+ this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath()));
+ this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
} catch ( final PersistenceException pe ) {
this.logger.warn("Problems moving jobs from previous version.", pe);
} finally {
@@ -155,15 +163,13 @@ public class UpgradeTask {
/**
* Recursively find jobs and move them
*/
- private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
- final TopologyCapabilities caps,
- final Resource rsrc) throws PersistenceException {
+ private void processJobsFromPreviousVersions(final Resource rsrc) throws PersistenceException {
if ( rsrc != null && caps.isActive() ) {
if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
- this.moveJobFromPreviousVersion(configuration, caps, rsrc);
+ this.moveJobFromPreviousVersion(rsrc);
} else {
for(final Resource child : rsrc.getChildren()) {
- this.processJobsFromPreviousVersions(configuration, caps, child);
+ this.processJobsFromPreviousVersions(child);
}
if ( caps.isActive() ) {
rsrc.getResourceResolver().delete(rsrc);
@@ -177,9 +183,7 @@ public class UpgradeTask {
/**
* Move a single job
*/
- private void moveJobFromPreviousVersion(final JobManagerConfiguration configuration,
- final TopologyCapabilities caps,
- final Resource jobResource)
+ private void moveJobFromPreviousVersion(final Resource jobResource)
throws PersistenceException {
final ResourceResolver resolver = jobResource.getResourceResolver();