You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 17:11:18 UTC
svn commit: r1632353 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology:
CheckTopologyTask.java TopologyCapabilities.java TopologyHandler.java
Author: cziegeler
Date: Thu Oct 16 15:11:17 2014
New Revision: 1632353
URL: http://svn.apache.org/r1632353
Log:
SLING-4066 : Reassign unhandled jobs
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java?rev=1632353&r1=1632352&r2=1632353&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java Thu Oct 16 15:11:17 2014
@@ -40,7 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The check topolgoy task checks for changes in the topology and queue configuration
+ * The check topology task checks for changes in the topology and queue configuration
* and reassigns jobs.
* If the leader instance finds a dead instance it reassigns its jobs to live instances.
* The leader instance also checks for unassigned jobs and tries to assign them.
@@ -67,6 +67,10 @@ public class CheckTopologyTask {
this.queueConfigManager = queueConfigurationManager;
}
+ /**
+ * Reassign jobs from stopped instance.
+ * @param caps Current topology capabilities.
+ */
private void reassignJobsFromStoppedInstances(final TopologyCapabilities caps) {
if ( caps != null && caps.isLeader() && caps.isActive() ) {
this.logger.debug("Checking for stopped instances...");
@@ -95,6 +99,97 @@ public class CheckTopologyTask {
}
/**
+ * Reassign stale jobs from this instance
+ * @param caps Current topology capabilities.
+ */
+ private void reassignStableJobs(final TopologyCapabilities caps) {
+ if ( caps != null && caps.isActive() ) {
+ this.logger.debug("Checking for stale jobs...");
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath());
+
+ // this resource should exist, but we check anyway
+ if ( jobsRoot != null ) {
+ // check if this instance supports bridged jobs
+ final List<InstanceDescription> bridgedTargets = caps.getPotentialTargets("/", null);
+ boolean flag = false;
+ for(final InstanceDescription desc : bridgedTargets) {
+ if ( desc.isLocal() ) {
+ flag = true;
+ break;
+ }
+ }
+ final boolean supportsBridged = flag;
+
+ final Iterator<Resource> topicIter = jobsRoot.listChildren();
+ while ( caps.isActive() && topicIter.hasNext() ) {
+ final Resource topicResource = topicIter.next();
+
+ final String topicName = topicResource.getName().replace('.', '/');
+ this.logger.debug("Checking topic {}..." , topicName);
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, null);
+ boolean reassign = true;
+ for(final InstanceDescription desc : potentialTargets) {
+ if ( desc.isLocal() ) {
+ reassign = false;
+ break;
+ }
+ }
+ if ( reassign ) {
+ final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
+ JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
+
+ @Override
+ public boolean handle(final Resource rsrc) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+ if ( !supportsBridged || vm.get(JobImpl.PROPERTY_BRIDGED_EVENT) == null ) {
+ final String targetId = caps.detectTarget(topicName, vm, info);
+
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ final String newPath;
+ if ( targetId != null ) {
+ newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ } else {
+ newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ }
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ ignoreException(pe);
+ resolver.refresh();
+ resolver.revert();
+ }
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ ignoreException(ie);
+ resolver.refresh();
+ resolver.revert();
+ }
+ return caps.isActive();
+ }
+ });
+
+ }
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
* Try to assign unassigned jobs as there might be changes in:
* - queue configurations
* - topology
@@ -227,6 +322,10 @@ public class CheckTopologyTask {
if ( topologyChanged ) {
this.reassignJobsFromStoppedInstances(topologyCapabilities);
}
+ // check for all topics
+ if ( topologyChanged || configChanged ) {
+ this.reassignStableJobs(topologyCapabilities);
+ }
// try to assign unassigned jobs
if ( topologyChanged || configChanged ) {
this.assignUnassignedJobs(topologyCapabilities);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java?rev=1632353&r1=1632352&r2=1632353&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java Thu Oct 16 15:11:17 2014
@@ -190,6 +190,7 @@ public class TopologyCapabilities {
/**
* Return the potential targets (Sling IDs) sorted by ID
+ * @return A list of instance descriptions. The list might be empty.
*/
public List<InstanceDescription> getPotentialTargets(final String jobTopic, final Map<String, Object> jobProperties) {
// calculate potential targets
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java?rev=1632353&r1=1632352&r2=1632353&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java Thu Oct 16 15:11:17 2014
@@ -39,8 +39,6 @@ import org.slf4j.LoggerFactory;
/**
* The topology handler listens for topology events.
- *
- * TODO - config changes should actually do a real stop/start
*/
@Component(immediate=true)
@Service(value={TopologyHandler.class, TopologyEventListener.class})
@@ -72,13 +70,12 @@ public class TopologyHandler
this.queueConfigManager.removeListener(this);
}
-
@Override
public void configChanged() {
final TopologyCapabilities caps = this.topologyCapabilities;
if ( caps != null ) {
synchronized ( this.listeners ) {
- // this.stopProcessing(false);
+ this.stopProcessing(false);
this.startProcessing(Type.PROPERTIES_CHANGED, caps, true);
}