You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 17:11:18 UTC

svn commit: r1632353 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology: CheckTopologyTask.java TopologyCapabilities.java TopologyHandler.java

Author: cziegeler
Date: Thu Oct 16 15:11:17 2014
New Revision: 1632353

URL: http://svn.apache.org/r1632353
Log:
SLING-4066 : Reassign unhandled jobs

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java?rev=1632353&r1=1632352&r2=1632353&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java Thu Oct 16 15:11:17 2014
@@ -40,7 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The check topolgoy task checks for changes in the topology and queue configuration
+ * The check topology task checks for changes in the topology and queue configuration
  * and reassigns jobs.
  * If the leader instance finds a dead instance it reassigns its jobs to live instances.
  * The leader instance also checks for unassigned jobs and tries to assign them.
@@ -67,6 +67,10 @@ public class CheckTopologyTask {
         this.queueConfigManager = queueConfigurationManager;
     }
 
+    /**
+     * Reassign jobs from stopped instance.
+     * @param caps Current topology capabilities.
+     */
     private void reassignJobsFromStoppedInstances(final TopologyCapabilities caps) {
         if ( caps != null && caps.isLeader() && caps.isActive() ) {
             this.logger.debug("Checking for stopped instances...");
@@ -95,6 +99,97 @@ public class CheckTopologyTask {
     }
 
     /**
+     * Reassign stale jobs from this instance
+     * @param caps Current topology capabilities.
+     */
+    private void reassignStableJobs(final TopologyCapabilities caps) {
+        if ( caps != null && caps.isActive() ) {
+            this.logger.debug("Checking for stale jobs...");
+            final ResourceResolver resolver = this.configuration.createResourceResolver();
+            try {
+                final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath());
+
+                // this resource should exist, but we check anyway
+                if ( jobsRoot != null ) {
+                    // check if this instance supports bridged jobs
+                    final List<InstanceDescription> bridgedTargets = caps.getPotentialTargets("/", null);
+                    boolean flag = false;
+                    for(final InstanceDescription desc : bridgedTargets) {
+                        if ( desc.isLocal() ) {
+                            flag = true;
+                            break;
+                        }
+                    }
+                    final boolean supportsBridged = flag;
+
+                    final Iterator<Resource> topicIter = jobsRoot.listChildren();
+                    while ( caps.isActive() && topicIter.hasNext() ) {
+                        final Resource topicResource = topicIter.next();
+
+                        final String topicName = topicResource.getName().replace('.', '/');
+                        this.logger.debug("Checking topic {}..." , topicName);
+                        final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, null);
+                        boolean reassign = true;
+                        for(final InstanceDescription desc : potentialTargets) {
+                            if ( desc.isLocal() ) {
+                                reassign = false;
+                                break;
+                            }
+                        }
+                        if ( reassign ) {
+                            final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
+                            JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
+
+                                @Override
+                                public boolean handle(final Resource rsrc) {
+                                    try {
+                                        final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+                                        if ( !supportsBridged || vm.get(JobImpl.PROPERTY_BRIDGED_EVENT) == null ) {
+                                            final String targetId = caps.detectTarget(topicName, vm, info);
+
+                                            final Map<String, Object> props = new HashMap<String, Object>(vm);
+                                            props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                                            final String newPath;
+                                            if ( targetId != null ) {
+                                                newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                                props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                                                props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                                            } else {
+                                                newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                                props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                                                props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                                            }
+                                            try {
+                                                ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                                resolver.delete(rsrc);
+                                                resolver.commit();
+                                            } catch ( final PersistenceException pe ) {
+                                                ignoreException(pe);
+                                                resolver.refresh();
+                                                resolver.revert();
+                                            }
+                                        }
+                                    } catch (final InstantiationException ie) {
+                                        // something happened with the resource in the meantime
+                                        ignoreException(ie);
+                                        resolver.refresh();
+                                        resolver.revert();
+                                    }
+                                    return caps.isActive();
+                                }
+                            });
+
+                        }
+                    }
+                }
+            } finally {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
      * Try to assign unassigned jobs as there might be changes in:
      * - queue configurations
      * - topology
@@ -227,6 +322,10 @@ public class CheckTopologyTask {
         if ( topologyChanged ) {
             this.reassignJobsFromStoppedInstances(topologyCapabilities);
         }
+        // check for all topics
+        if ( topologyChanged || configChanged ) {
+            this.reassignStableJobs(topologyCapabilities);
+        }
         // try to assign unassigned jobs
         if ( topologyChanged || configChanged ) {
             this.assignUnassignedJobs(topologyCapabilities);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java?rev=1632353&r1=1632352&r2=1632353&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java Thu Oct 16 15:11:17 2014
@@ -190,6 +190,7 @@ public class TopologyCapabilities {
 
     /**
      * Return the potential targets (Sling IDs) sorted by ID
+     * @return A list of instance descriptions. The list might be empty.
      */
     public List<InstanceDescription> getPotentialTargets(final String jobTopic, final Map<String, Object> jobProperties) {
         // calculate potential targets

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java?rev=1632353&r1=1632352&r2=1632353&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java Thu Oct 16 15:11:17 2014
@@ -39,8 +39,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * The topology handler listens for topology events.
- *
- * TODO - config changes should actually do a real stop/start
  */
 @Component(immediate=true)
 @Service(value={TopologyHandler.class, TopologyEventListener.class})
@@ -72,13 +70,12 @@ public class TopologyHandler
         this.queueConfigManager.removeListener(this);
     }
 
-
     @Override
     public void configChanged() {
         final TopologyCapabilities caps = this.topologyCapabilities;
         if ( caps != null ) {
             synchronized ( this.listeners ) {
- //               this.stopProcessing(false);
+                this.stopProcessing(false);
 
                 this.startProcessing(Type.PROPERTIES_CHANGED, caps, true);
             }