You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/15 13:30:10 UTC

svn commit: r1631994 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl: jobs/ jobs/config/ jobs/topics/ topology/

Author: cziegeler
Date: Wed Oct 15 11:30:09 2014
New Revision: 1631994

URL: http://svn.apache.org/r1631994
Log:
SLING-4048 : Avoid keeping jobs in memory. Move topology handling to own handler service

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Oct 15 11:30:09 2014
@@ -117,10 +117,6 @@ public class JobManagerImpl
     @Reference
     private EventAdmin eventAdmin;
 
-    /** The configuration manager. */
-    @Reference
-    private QueueConfigurationManager queueConfigManager;
-
     @Reference
     private Scheduler scheduler;
 
@@ -137,6 +133,8 @@ public class JobManagerImpl
     @Reference
     private JobManagerConfiguration configuration;
 
+    @Reference
+    private QueueConfigurationManager queueManager;
 
     private volatile TopologyCapabilities topologyCapabilities;
 
@@ -248,7 +246,7 @@ public class JobManagerImpl
         // invoke maintenance task
         final MaintenanceTask task = this.maintenanceTask;
         if ( task != null ) {
-            task.run(this.topologyCapabilities, this.queueConfigManager, this.schedulerRuns - 1);
+            task.run(this.topologyCapabilities, this.schedulerRuns - 1);
         }
         logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
     }
@@ -270,7 +268,11 @@ public class JobManagerImpl
         }
 
         // get the queue configuration
-        final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+        final TopologyCapabilities caps = this.topologyCapabilities;
+        final QueueInfo queueInfo = caps != null ? caps.getQueueInfo(job.getTopic()) : null;
+        if ( queueInfo == null ) {
+            return; // TODO
+        }
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
 
         // Sanity check if queue configuration has changed
@@ -288,7 +290,6 @@ public class JobManagerImpl
         } else {
 
             if ( reassign ) {
-                final TopologyCapabilities caps = this.topologyCapabilities;
                 reassignTargetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
 
             } else {
@@ -1167,7 +1168,7 @@ public class JobManagerImpl
             final String jobName,
             final Map<String, Object> jobProperties,
             final List<String> errors) {
-        final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic);
+        final QueueInfo info = this.queueManager.getQueueInfo(jobTopic);
         if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP ) {
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Dropping job due to configuration of queue {} : {}", info.queueName, Utility.toString(jobTopic, jobName, jobProperties));
@@ -1280,7 +1281,7 @@ public class JobManagerImpl
     }
 
     public void reassign(final JobImpl job) {
-        final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+        final QueueInfo queueInfo = queueManager.getQueueInfo(job.getTopic());
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
 
         // Sanity check if queue configuration has changed
@@ -1351,7 +1352,7 @@ public class JobManagerImpl
         final JobImpl job = (JobImpl)this.getJobById(jobId);
         if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
             // get the queue configuration
-            final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+            final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
             final AbstractJobQueue queue;
             synchronized ( queuesLock ) {
                 queue = this.queues.get(queueInfo.queueName);
@@ -1361,6 +1362,7 @@ public class JobManagerImpl
                 stopped = queue.stopJob(job);
             }
             if ( forward && !stopped ) {
+                // TODO why not remove the resource?
                 // send remote event
                 final Map<String, Object> props = new HashMap<String, Object>();
                 props.put(Utility.PROPERTY_ID, jobId);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Wed Oct 15 11:30:09 2014
@@ -18,8 +18,6 @@
  */
 package org.apache.sling.event.impl.jobs;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -32,15 +30,10 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.api.resource.ValueMap;
-import org.apache.sling.discovery.InstanceDescription;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.support.BatchResourceRemover;
-import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.impl.topology.TopologyCapabilities;
 import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.QueueConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,14 +50,6 @@ public class MaintenanceTask {
     /** Job manager configuration. */
     private final JobManagerConfiguration configuration;
 
-    /** Change count for queue configurations .*/
-    private volatile long queueConfigChangeCount = -1;
-
-    /** Change count for topology changes .*/
-    private volatile long topologyChangeCount = -1;
-
-    private boolean checkedForPreviousVersion = false;
-
     /**
      * Constructor
      */
@@ -72,241 +57,11 @@ public class MaintenanceTask {
         this.configuration = config;
     }
 
-    private void reassignJobs(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
-        if ( caps != null && caps.isLeader() ) {
-            this.logger.debug("Checking for stopped instances...");
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            try {
-                final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
-                this.logger.debug("Got jobs root {}", jobsRoot);
-
-                // this resource should exist, but we check anyway
-                if ( jobsRoot != null ) {
-                    final Iterator<Resource> instanceIter = jobsRoot.listChildren();
-                    while ( caps.isActive() && instanceIter.hasNext() ) {
-                        final Resource instanceResource = instanceIter.next();
-
-                        final String instanceId = instanceResource.getName();
-                        if ( !caps.isActive(instanceId) ) {
-                            logger.debug("Found stopped instance {}", instanceId);
-                            assignJobs(caps, queueManager, instanceResource, true);
-                        }
-                    }
-                }
-            } finally {
-                resolver.close();
-            }
-        }
-    }
-
-    /**
-     * Try to assign unassigned jobs as there might be changes in:
-     * - queue configurations
-     * - topology
-     * - capabilities
-     */
-    private void assignUnassignedJobs(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
-        if ( caps != null && caps.isLeader() ) {
-            logger.debug("Checking unassigned jobs...");
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            try {
-                final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
-                logger.debug("Got unassigned root {}", unassignedRoot);
-
-                // this resource should exist, but we check anyway
-                if ( unassignedRoot != null ) {
-                    assignJobs(caps, queueManager, unassignedRoot, false);
-                }
-            } finally {
-                resolver.close();
-            }
-        }
-    }
-
-    /**
-     * Try to assign all jobs from the jobs root.
-     * The jobs are stored by topic
-     */
-    private void assignJobs(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager,
-            final Resource jobsRoot,
-            final boolean unassign) {
-        final ResourceResolver resolver = jobsRoot.getResourceResolver();
-
-        final Iterator<Resource> topicIter = jobsRoot.listChildren();
-        while ( caps.isActive() && topicIter.hasNext() ) {
-            final Resource topicResource = topicIter.next();
-
-            final String topicName = topicResource.getName().replace('.', '/');
-            logger.debug("Found topic {}", topicName);
-
-            final String checkTopic;
-            if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
-                checkTopic = "/";
-            } else {
-                checkTopic = topicName;
-            }
-
-            // first check if there is an instance for these topics
-            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
-            if ( potentialTargets != null && potentialTargets.size() > 0 ) {
-                final QueueInfo info = queueManager.getQueueInfo(topicName);
-                logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
-
-                // if queue is configured to drop, we drop
-                if ( info.queueConfiguration.getType() ==  QueueConfiguration.Type.DROP) {
-                    final Iterator<Resource> i = topicResource.listChildren();
-                    while ( caps.isActive() && i.hasNext() ) {
-                        final Resource rsrc = i.next();
-                        try {
-                            resolver.delete(rsrc);
-                            resolver.commit();
-                        } catch ( final PersistenceException pe ) {
-                            this.ignoreException(pe);
-                            resolver.refresh();
-                        }
-                    }
-                } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
-                    // if the queue is not configured to ignore, we can reschedule
-                    for(final Resource yearResource : topicResource.getChildren() ) {
-                        for(final Resource monthResource : yearResource.getChildren() ) {
-                            for(final Resource dayResource : monthResource.getChildren() ) {
-                                for(final Resource hourResource : dayResource.getChildren() ) {
-                                    for(final Resource minuteResource : hourResource.getChildren() ) {
-                                        for(final Resource rsrc : minuteResource.getChildren() ) {
-
-                                            if ( !caps.isActive() ) {
-                                                return;
-                                            }
-
-                                            try {
-                                                final ValueMap vm = ResourceHelper.getValueMap(rsrc);
-                                                final String targetId = caps.detectTarget(topicName, vm, info);
-
-                                                if ( targetId != null ) {
-                                                    final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
-                                                    final Map<String, Object> props = new HashMap<String, Object>(vm);
-                                                    props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
-                                                    props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
-                                                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-                                                    try {
-                                                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                                                        resolver.delete(rsrc);
-                                                        resolver.commit();
-                                                    } catch ( final PersistenceException pe ) {
-                                                        this.ignoreException(pe);
-                                                        resolver.refresh();
-                                                    }
-                                                }
-                                            } catch (final InstantiationException ie) {
-                                                // something happened with the resource in the meantime
-                                                this.ignoreException(ie);
-                                                resolver.refresh();
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-            if ( caps.isActive() && unassign ) {
-                // we have to move everything to the unassigned area
-                for(final Resource yearResource : topicResource.getChildren() ) {
-                    for(final Resource monthResource : yearResource.getChildren() ) {
-                        for(final Resource dayResource : monthResource.getChildren() ) {
-                            for(final Resource hourResource : dayResource.getChildren() ) {
-                                for(final Resource minuteResource : hourResource.getChildren() ) {
-                                    for(final Resource rsrc : minuteResource.getChildren() ) {
-
-                                        if ( !caps.isActive() ) {
-                                            return;
-                                        }
-
-                                        try {
-                                            final ValueMap vm = ResourceHelper.getValueMap(rsrc);
-                                            final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
-                                            final Map<String, Object> props = new HashMap<String, Object>(vm);
-                                            props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                                            props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                                            props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
-                                            try {
-                                                ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                                                resolver.delete(rsrc);
-                                                resolver.commit();
-                                            } catch ( final PersistenceException pe ) {
-                                                this.ignoreException(pe);
-                                                resolver.refresh();
-                                            }
-                                        } catch (final InstantiationException ie) {
-                                            // something happened with the resource in the meantime
-                                            this.ignoreException(ie);
-                                            resolver.refresh();
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Check if the topology has changed.
-     */
-    private boolean topologyHasChanged(final TopologyCapabilities topologyCapabilities) {
-        boolean topologyChanged = false;
-        if ( topologyCapabilities != null ) {
-            if ( this.topologyChangeCount != topologyCapabilities.getChangeCount() ) {
-                this.topologyChangeCount = topologyCapabilities.getChangeCount();
-                topologyChanged = true;
-            }
-        }
-        return topologyChanged;
-    }
-
-    private boolean queueConfigurationHasChanged(final TopologyCapabilities topologyCapabilities,
-            final QueueConfigurationManager queueManager) {
-        boolean configChanged = false;
-        if ( topologyCapabilities != null ) {
-            final int queueChangeCount = queueManager.getChangeCount();
-            if ( this.queueConfigChangeCount < queueChangeCount ) {
-                configChanged = true;
-                this.queueConfigChangeCount = queueChangeCount;
-            }
-        }
-        return configChanged;
-    }
-
     /**
      * One maintenance run
      */
     public void run(final TopologyCapabilities topologyCapabilities,
-            final QueueConfigurationManager queueManager,
             final long cleanUpCounter) {
-        // check topology and config change during each invocation
-        final boolean topologyChanged = this.topologyHasChanged(topologyCapabilities);
-        final boolean configChanged = this.queueConfigurationHasChanged(topologyCapabilities, queueManager);
-
-        // if topology changed, reschedule assigned jobs for stopped instances
-        if ( topologyChanged ) {
-            this.reassignJobs(topologyCapabilities, queueManager);
-        }
-        // try to assign unassigned jobs
-        if ( topologyChanged || configChanged ) {
-            this.assignUnassignedJobs(topologyCapabilities, queueManager);
-        }
-
-        if ( topologyChanged && !this.checkedForPreviousVersion && topologyCapabilities != null && topologyCapabilities.isLeader() ) {
-            this.processJobsFromPreviousVersions(topologyCapabilities, queueManager);
-        }
-
         if ( topologyCapabilities != null ) {
             // Clean up
             final String cleanUpAssignedPath;;
@@ -641,138 +396,4 @@ public class MaintenanceTask {
             this.logger.debug("Ignored exception " + e.getMessage(), e);
         }
     }
-
-    /**
-     * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
-     */
-    private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionAnonPath()));
-            this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionIdentifiedPath()));
-            this.checkedForPreviousVersion = true;
-        } catch ( final PersistenceException pe ) {
-            this.logger.warn("Problems moving jobs from previous version.", pe);
-        } finally {
-            resolver.close();
-        }
-    }
-
-    /**
-     * Recursively find jobs and move them
-     */
-    private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager,
-            final Resource rsrc) throws PersistenceException {
-        if ( rsrc != null && caps.isActive() ) {
-            if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
-                this.moveJobFromPreviousVersion(caps, queueManager, rsrc);
-            } else {
-                for(final Resource child : rsrc.getChildren()) {
-                    this.processJobsFromPreviousVersions(caps, queueManager, child);
-                }
-                if ( caps.isActive() ) {
-                    rsrc.getResourceResolver().delete(rsrc);
-                    rsrc.getResourceResolver().commit();
-                    rsrc.getResourceResolver().refresh();
-                }
-            }
-        }
-    }
-
-    /**
-     * Move a single job
-     */
-    private void moveJobFromPreviousVersion(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager,
-            final Resource jobResource)
-    throws PersistenceException {
-        final ResourceResolver resolver = jobResource.getResourceResolver();
-
-        try {
-            final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-            // check for binary properties
-            Map<String, Object> binaryProperties = new HashMap<String, Object>();
-            final ObjectInputStream ois = vm.get("slingevent:properties", ObjectInputStream.class);
-            if ( ois != null ) {
-                try {
-                    int length = ois.readInt();
-                    for(int i=0;i<length;i++) {
-                        final String key = (String)ois.readObject();
-                        final Object value = ois.readObject();
-                        binaryProperties.put(key, value);
-                    }
-                } catch (final ClassNotFoundException cnfe) {
-                    throw new PersistenceException("Class not found.", cnfe);
-                } catch (final java.io.InvalidClassException ice) {
-                    throw new PersistenceException("Invalid class.", ice);
-                } catch (final IOException ioe) {
-                    throw new PersistenceException("Unable to deserialize job properties.", ioe);
-                } finally {
-                    try {
-                        ois.close();
-                    } catch (final IOException ioe) {
-                        this.ignoreException(ioe);
-                    }
-                }
-            }
-
-            final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
-
-            properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
-            final String topic = (String)properties.remove("slingevent:topic");
-            properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic);
-
-            properties.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-            properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-            // and binary properties
-            properties.putAll(binaryProperties);
-            properties.remove("slingevent:properties");
-
-            if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) {
-                properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue
-            }
-            if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) {
-                properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
-            }
-
-            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
-            String targetId = null;
-            if ( potentialTargets != null && potentialTargets.size() > 0 ) {
-                final QueueInfo info = queueManager.getQueueInfo(topic);
-                logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
-                // if queue is configured to drop, we drop
-                if ( info.queueConfiguration.getType() ==  QueueConfiguration.Type.DROP) {
-                    resolver.delete(jobResource);
-                    resolver.commit();
-                    return;
-                }
-                if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
-                    targetId = caps.detectTarget(topic, vm, info);
-                    if ( targetId != null ) {
-                        properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
-                        properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
-                        properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
-                    }
-                }
-            }
-
-            properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID);
-            properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
-
-            final String jobId = this.configuration.getUniqueId(topic);
-            properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
-            properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
-            final String newPath = this.configuration.getUniquePath(targetId, topic, jobId, vm);
-            this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath);
-
-            ResourceHelper.getOrCreateResource(resolver, newPath, properties);
-            resolver.delete(jobResource);
-            resolver.commit();
-        } catch (final InstantiationException ie) {
-            throw new PersistenceException("Exception while reading reasource: " + ie.getMessage(), ie.getCause());
-        }
-    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Wed Oct 15 11:30:09 2014
@@ -31,7 +31,9 @@ import org.apache.sling.api.resource.Log
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
 import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
 
 /**
@@ -41,6 +43,10 @@ import org.osgi.util.tracker.ServiceTrac
 @Service(value=QueueConfigurationManager.class)
 public class QueueConfigurationManager {
 
+    public interface QueueConfigurationChangeListener {
+        void configChanged();
+    }
+
     /** Configurations - ordered by service ranking. */
     private volatile InternalQueueConfiguration[] orderedConfigs = new InternalQueueConfiguration[0];
 
@@ -53,6 +59,9 @@ public class QueueConfigurationManager {
     @Reference
     private MainQueueConfiguration mainQueueConfiguration;
 
+    /** Listeners. */
+    private final List<QueueConfigurationChangeListener> listeners = new ArrayList<QueueConfigurationChangeListener>();
+
     /**
      * Activate this component.
      * Create the service tracker and start it.
@@ -61,7 +70,24 @@ public class QueueConfigurationManager {
     protected void activate(final BundleContext bundleContext)
     throws LoginException, PersistenceException {
         this.configTracker = new ServiceTracker(bundleContext,
-                InternalQueueConfiguration.class.getName(), null);
+                InternalQueueConfiguration.class.getName(), new ServiceTrackerCustomizer() {
+
+                    @Override
+                    public void removedService(final ServiceReference reference, final Object service) {
+                        bundleContext.ungetService(reference);
+                        updateListeners();
+                    }
+
+                    @Override
+                    public void modifiedService(ServiceReference reference, Object service) {
+                        // nothing to do
+                    }
+
+                    @Override
+                    public Object addingService(final ServiceReference reference) {
+                        return bundleContext.getService(reference);
+                    }
+                });
         this.configTracker.open();
     }
 
@@ -145,4 +171,24 @@ public class QueueConfigurationManager {
     public int getChangeCount() {
         return this.configTracker.getTrackingCount();
     }
+
+    public void addListener(final QueueConfigurationChangeListener listener) {
+        synchronized ( this.listeners ) {
+            this.listeners.add(listener);
+        }
+    }
+
+    public void removeListener(final QueueConfigurationChangeListener listener) {
+        synchronized ( this.listeners ) {
+            this.listeners.remove(listener);
+        }
+    }
+
+    private void updateListeners() {
+        synchronized ( listeners ) {
+            for(final QueueConfigurationChangeListener l : listeners) {
+                l.configChanged();
+            }
+        }
+    }
 }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.topics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.slf4j.Logger;
+
+public class JobTopicTraverser {
+
+    public interface Handler {
+        boolean handle(final JobImpl job);
+    }
+
+    public static void traverse(final Logger logger,
+            final Resource topicResource,
+            final Handler handler) {
+        logger.debug("Processing topic {}", topicResource.getName());
+        // now years
+        for(final Resource yearResource: Utility.getSortedChildren(logger, "year", topicResource)) {
+            final int year = Integer.valueOf(yearResource.getName());
+            logger.debug("Processing year {}", year);
+
+            // now months
+            for(final Resource monthResource: Utility.getSortedChildren(logger, "month", yearResource)) {
+                final int month = Integer.valueOf(monthResource.getName());
+                logger.debug("Processing month {}", month);
+
+                // now days
+                for(final Resource dayResource: Utility.getSortedChildren(logger, "day", monthResource)) {
+                    final int day = Integer.valueOf(dayResource.getName());
+                    logger.debug("Processing day {}", day);
+
+                    // now hours
+                    for(final Resource hourResource: Utility.getSortedChildren(logger, "hour", dayResource)) {
+                        final int hour = Integer.valueOf(hourResource.getName());
+                        logger.debug("Processing hour {}", hour);
+
+                        // now minutes
+                        for(final Resource minuteResource: Utility.getSortedChildren(logger, "minute", hourResource)) {
+                            final int minute = Integer.valueOf(minuteResource.getName());
+                            logger.debug("Processing minute {}", minute);
+
+                            // now jobs
+                            final List<JobImpl> jobs = new ArrayList<JobImpl>();
+                            final Iterator<Resource> jobIter = minuteResource.listChildren();
+                            while ( jobIter.hasNext() ) {
+                                final Resource jobResource = jobIter.next();
+
+                                final JobImpl job = Utility.readJob(logger, jobResource);
+                                if ( job != null ) {
+                                    logger.debug("Found job {}", jobResource.getName());
+                                    jobs.add(job);
+                                }
+                            }
+
+                            Collections.sort(jobs);
+
+                            for(final JobImpl job : jobs) {
+                                if ( !handler.handle(job) ) {
+                                    return;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.topology;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintenance task...
+ *
+ * In the default configuration, this task runs every minute
+ */
+public class MaintenanceTask {
+
+    /** Logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Job manager configuration. */
+    private final JobManagerConfiguration configuration;
+
+    /**
+     * Constructor
+     */
+    public MaintenanceTask(final JobManagerConfiguration config) {
+        this.configuration = config;
+    }
+
+    private void reassignJobs(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        if ( caps != null && caps.isLeader() ) {
+            this.logger.debug("Checking for stopped instances...");
+            final ResourceResolver resolver = this.configuration.createResourceResolver();
+            try {
+                final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
+                this.logger.debug("Got jobs root {}", jobsRoot);
+
+                // this resource should exist, but we check anyway
+                if ( jobsRoot != null ) {
+                    final Iterator<Resource> instanceIter = jobsRoot.listChildren();
+                    while ( caps.isActive() && instanceIter.hasNext() ) {
+                        final Resource instanceResource = instanceIter.next();
+
+                        final String instanceId = instanceResource.getName();
+                        if ( !caps.isActive(instanceId) ) {
+                            logger.debug("Found stopped instance {}", instanceId);
+                            assignJobs(caps, queueManager, instanceResource, true);
+                        }
+                    }
+                }
+            } finally {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * Try to assign unassigned jobs as there might be changes in:
+     * - queue configurations
+     * - topology
+     * - capabilities
+     */
+    private void assignUnassignedJobs(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        if ( caps != null && caps.isLeader() ) {
+            logger.debug("Checking unassigned jobs...");
+            final ResourceResolver resolver = this.configuration.createResourceResolver();
+            try {
+                final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
+                logger.debug("Got unassigned root {}", unassignedRoot);
+
+                // this resource should exist, but we check anyway
+                if ( unassignedRoot != null ) {
+                    assignJobs(caps, queueManager, unassignedRoot, false);
+                }
+            } finally {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * Try to assign all jobs from the jobs root.
+     * The jobs are stored by topic
+     */
+    private void assignJobs(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager,
+            final Resource jobsRoot,
+            final boolean unassign) {
+        final ResourceResolver resolver = jobsRoot.getResourceResolver();
+
+        final Iterator<Resource> topicIter = jobsRoot.listChildren();
+        while ( caps.isActive() && topicIter.hasNext() ) {
+            final Resource topicResource = topicIter.next();
+
+            final String topicName = topicResource.getName().replace('.', '/');
+            logger.debug("Found topic {}", topicName);
+
+            final String checkTopic;
+            if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
+                checkTopic = "/";
+            } else {
+                checkTopic = topicName;
+            }
+
+            // first check if there is an instance for these topics
+            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
+            if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+                final QueueInfo info = queueManager.getQueueInfo(topicName);
+                logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
+
+                // if queue is configured to drop, we drop
+                if ( info.queueConfiguration.getType() ==  QueueConfiguration.Type.DROP) {
+                    final Iterator<Resource> i = topicResource.listChildren();
+                    while ( caps.isActive() && i.hasNext() ) {
+                        final Resource rsrc = i.next();
+                        try {
+                            resolver.delete(rsrc);
+                            resolver.commit();
+                        } catch ( final PersistenceException pe ) {
+                            this.ignoreException(pe);
+                            resolver.refresh();
+                        }
+                    }
+                } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+                    // if the queue is not configured to ignore, we can reschedule
+                    for(final Resource yearResource : topicResource.getChildren() ) {
+                        for(final Resource monthResource : yearResource.getChildren() ) {
+                            for(final Resource dayResource : monthResource.getChildren() ) {
+                                for(final Resource hourResource : dayResource.getChildren() ) {
+                                    for(final Resource minuteResource : hourResource.getChildren() ) {
+                                        for(final Resource rsrc : minuteResource.getChildren() ) {
+
+                                            if ( !caps.isActive() ) {
+                                                return;
+                                            }
+
+                                            try {
+                                                final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+                                                final String targetId = caps.detectTarget(topicName, vm, info);
+
+                                                if ( targetId != null ) {
+                                                    final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                                    final Map<String, Object> props = new HashMap<String, Object>(vm);
+                                                    props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                                                    props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                                                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                                                    try {
+                                                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                                        resolver.delete(rsrc);
+                                                        resolver.commit();
+                                                    } catch ( final PersistenceException pe ) {
+                                                        this.ignoreException(pe);
+                                                        resolver.refresh();
+                                                    }
+                                                }
+                                            } catch (final InstantiationException ie) {
+                                                // something happened with the resource in the meantime
+                                                this.ignoreException(ie);
+                                                resolver.refresh();
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            if ( caps.isActive() && unassign ) {
+                // we have to move everything to the unassigned area
+                for(final Resource yearResource : topicResource.getChildren() ) {
+                    for(final Resource monthResource : yearResource.getChildren() ) {
+                        for(final Resource dayResource : monthResource.getChildren() ) {
+                            for(final Resource hourResource : dayResource.getChildren() ) {
+                                for(final Resource minuteResource : hourResource.getChildren() ) {
+                                    for(final Resource rsrc : minuteResource.getChildren() ) {
+
+                                        if ( !caps.isActive() ) {
+                                            return;
+                                        }
+
+                                        try {
+                                            final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+                                            final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                            final Map<String, Object> props = new HashMap<String, Object>(vm);
+                                            props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                                            props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                                            props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                                            try {
+                                                ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                                resolver.delete(rsrc);
+                                                resolver.commit();
+                                            } catch ( final PersistenceException pe ) {
+                                                this.ignoreException(pe);
+                                                resolver.refresh();
+                                            }
+                                        } catch (final InstantiationException ie) {
+                                            // something happened with the resource in the meantime
+                                            this.ignoreException(ie);
+                                            resolver.refresh();
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * One maintenance run
+     */
+    public void run(final TopologyCapabilities topologyCapabilities,
+            final QueueConfigurationManager queueManager,
+            final boolean topologyChanged,
+            final boolean configChanged) {
+        // if topology changed, reschedule assigned jobs for stopped instances
+        if ( topologyChanged ) {
+            this.reassignJobs(topologyCapabilities, queueManager);
+        }
+        // try to assign unassigned jobs
+        if ( topologyChanged || configChanged ) {
+            this.assignUnassignedJobs(topologyCapabilities, queueManager);
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.topology;
+
+import java.util.Iterator;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.topics.JobTopicTraverser;
+import org.apache.sling.event.jobs.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RestartTask {
+
+    /** Logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    public void run(final JobManagerConfiguration configuration) {
+        this.initialScan(configuration);
+    }
+
+    /**
+     * Scan the resource tree for unfinished jobs from previous runs
+     */
+    private void initialScan(final JobManagerConfiguration configuration) {
+        logger.debug("Scanning repository for unfinished jobs...");
+        final ResourceResolver resolver = configuration.createResourceResolver();
+        try {
+            final Resource baseResource = resolver.getResource(configuration.getLocalJobsPath());
+
+            // sanity check - should never be null
+            if ( baseResource != null ) {
+                final Iterator<Resource> topicIter = baseResource.listChildren();
+                while ( topicIter.hasNext() ) {
+                    final Resource topicResource = topicIter.next();
+                    logger.debug("Found topic {}", topicResource.getName());
+
+                    // init topic
+                    initTopic(topicResource);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
+    }
+
+    /**
+     * Initialize a topic and update all jobs from that topic.
+     * Reset started time and increase retry count of unfinished jobs
+     * @param topicResource The topic resource
+     */
+    private void initTopic(final Resource topicResource) {
+        logger.debug("Initializing topic {}...", topicResource.getName());
+
+        JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() {
+
+            @Override
+            public boolean handle(final JobImpl job) {
+                if ( job.getProcessingStarted() != null ) {
+                    job.retry();
+                    try {
+                        final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+                        // sanity check
+                        if ( jobResource != null ) {
+                            final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+                            mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                            mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getRetryCount());
+                            jobResource.getResourceResolver().commit();
+                        }
+                    } catch ( final PersistenceException ignore) {
+                        logger.error("Unable to update unfinished job " + job, ignore);
+                    }
+                }
+                return true;
+            }
+        });
+        logger.debug("Topic {} initialized", topicResource.getName());
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java Wed Oct 15 11:30:09 2014
@@ -30,6 +30,8 @@ import org.apache.sling.discovery.Instan
 import org.apache.sling.discovery.TopologyView;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.jobs.QueueConfiguration;
@@ -73,6 +75,9 @@ public class TopologyCapabilities {
     /** JobManagerConfiguration. */
     private final JobManagerConfiguration jobManagerConfiguration;
 
+    /** Queue config manager. */
+    private final QueueConfigurationManager queueManager;
+
     public static final class InstanceDescriptionComparator implements Comparator<InstanceDescription> {
 
         private final String localClusterId;
@@ -120,8 +125,11 @@ public class TopologyCapabilities {
         return allInstances;
     }
 
-    public TopologyCapabilities(final TopologyView view, final JobManagerConfiguration config) {
+    public TopologyCapabilities(final TopologyView view,
+            final QueueConfigurationManager queueManager,
+            final JobManagerConfiguration config) {
         this.jobManagerConfiguration = config;
+        this.queueManager = queueManager;
         this.instanceComparator = new InstanceDescriptionComparator(view.getLocalInstance().getClusterView().getId());
         this.isLeader = view.getLocalInstance().isLeader();
         this.allInstances = getAllInstancesMap(view);
@@ -156,11 +164,11 @@ public class TopologyCapabilities {
     public boolean isActive() {
         return this.active;
     }
-
+/*
     public long getChangeCount() {
         return this.changeCount;
     }
-
+*/
     public boolean isActive(final String instanceId) {
         return this.allInstances.containsKey(instanceId);
     }
@@ -280,4 +288,17 @@ public class TopologyCapabilities {
         return this.instanceCapabilities;
     }
 
+    public QueueInfo getQueueInfo(final String topic) {
+        if ( this.active ) {
+            return this.queueManager.getQueueInfo(topic);
+        }
+        return null;
+    }
+
+    public InternalQueueConfiguration[] getQueueConfigurations() {
+        if ( this.active ) {
+            return this.queueManager.getConfigurations();
+        }
+        return null;
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java Wed Oct 15 11:30:09 2014
@@ -22,25 +22,30 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.discovery.TopologyEvent;
 import org.apache.sling.discovery.TopologyEvent.Type;
 import org.apache.sling.discovery.TopologyEventListener;
-import org.apache.sling.discovery.TopologyView;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueConfigurationChangeListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The topology handler listens for topology events
+ * The topology handler listens for topology events.
+ *
+ * TODO - config changes should actually do a real stop/start
  */
 @Component(immediate=true)
 @Service(value={TopologyHandler.class, TopologyEventListener.class})
 public class TopologyHandler
-    implements TopologyEventListener {
+    implements TopologyEventListener, QueueConfigurationChangeListener {
 
     /** Logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -51,20 +56,75 @@ public class TopologyHandler
     @Reference
     private JobManagerConfiguration configuration;
 
+    @Reference
+    private QueueConfigurationManager queueManager;
+
     /** The topology capabilities. */
     private volatile TopologyCapabilities topologyCapabilities;
 
-    private void stopProcessing() {
+    @Activate
+    protected void activate() {
+        this.queueManager.addListener(this);
+    }
+
+    @Deactivate
+    protected void dectivate() {
+        this.queueManager.removeListener(this);
+    }
+
+
+    @Override
+    public void configChanged() {
+        final TopologyCapabilities caps = this.topologyCapabilities;
+        if ( caps != null ) {
+            synchronized ( this.listeners ) {
+ //               this.stopProcessing(false);
+
+                this.startProcessing(Type.PROPERTIES_CHANGED, caps, true);
+            }
+        }
+    }
+
+    private void stopProcessing(final boolean deactivate) {
+        boolean notify = this.topologyCapabilities != null;
         // deactivate old capabilities - this stops all background processes
-        if ( this.topologyCapabilities != null ) {
+        if ( deactivate && this.topologyCapabilities != null ) {
             this.topologyCapabilities.deactivate();
         }
         this.topologyCapabilities = null;
+
+        if ( notify ) {
+            // stop all listeners
+            this.notifiyListeners();
+        }
     }
 
-    private void startProcessing(final TopologyView view) {
+    private void startProcessing(final Type eventType, final TopologyCapabilities newCaps, final boolean isConfigChange) {
         // create new capabilities and update view
-        this.topologyCapabilities = new TopologyCapabilities(view, this.configuration);
+        this.topologyCapabilities = newCaps;
+
+        // before we propagate the new topology we do some maintenance
+        if ( eventType == Type.TOPOLOGY_INIT ) {
+            final UpgradeTask task = new UpgradeTask();
+            task.run(this.configuration, this.topologyCapabilities, queueManager);
+
+            final RestartTask rt = new RestartTask();
+            rt.run(this.configuration);
+        }
+
+        final MaintenanceTask mt = new MaintenanceTask(this.configuration);
+        mt.run(topologyCapabilities, queueManager, !isConfigChange, isConfigChange);
+
+        if ( !isConfigChange ) {
+            // start listeners
+            this.notifiyListeners();
+        }
+    }
+
+    private void notifiyListeners() {
+        for(final TopologyAware l : this.listeners) {
+            l.topologyChanged(this.topologyCapabilities);
+        }
     }
 
     /**
@@ -86,22 +146,15 @@ public class TopologyHandler
         synchronized ( this.listeners ) {
 
             if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
-               this.stopProcessing();
+               this.stopProcessing(true);
 
-               for(final TopologyAware l : this.listeners) {
-                   l.topologyChanged(this.topologyCapabilities);
-               }
             } else if ( event.getType() == Type.TOPOLOGY_INIT
                 || event.getType() == Type.TOPOLOGY_CHANGED
                 || event.getType() == Type.PROPERTIES_CHANGED ) {
 
-                this.stopProcessing();
-
-                this.startProcessing(event.getNewView());
+                this.stopProcessing(true);
 
-                for(final TopologyAware l : this.listeners) {
-                    l.topologyChanged(this.topologyCapabilities);
-                }
+                this.startProcessing(event.getType(), new TopologyCapabilities(event.getNewView(), this.queueManager, this.configuration), false);
             }
 
         }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.topology;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Upgrade task
+ *
+ * Upgrade jobs from earlier versions to the new format.
+ */
+public class UpgradeTask {
+
+    /** Logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /**
+     * Upgrade
+     */
+    public void run(final JobManagerConfiguration configuration,
+            final TopologyCapabilities topologyCapabilities,
+            final QueueConfigurationManager queueManager) {
+        if ( topologyCapabilities.isLeader() ) {
+            this.processJobsFromPreviousVersions(configuration, topologyCapabilities, queueManager);
+        }
+    }
+
+    /**
+     * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
+     */
+    private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
+            final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        final ResourceResolver resolver = configuration.createResourceResolver();
+        try {
+            this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionAnonPath()));
+            this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+        } catch ( final PersistenceException pe ) {
+            this.logger.warn("Problems moving jobs from previous version.", pe);
+        } finally {
+            resolver.close();
+        }
+    }
+
+    /**
+     * Recursively find jobs and move them
+     */
+    private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
+            final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager,
+            final Resource rsrc) throws PersistenceException {
+        if ( rsrc != null && caps.isActive() ) {
+            if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
+                this.moveJobFromPreviousVersion(configuration, caps, queueManager, rsrc);
+            } else {
+                for(final Resource child : rsrc.getChildren()) {
+                    this.processJobsFromPreviousVersions(configuration, caps, queueManager, child);
+                }
+                if ( caps.isActive() ) {
+                    rsrc.getResourceResolver().delete(rsrc);
+                    rsrc.getResourceResolver().commit();
+                    rsrc.getResourceResolver().refresh();
+                }
+            }
+        }
+    }
+
+    /**
+     * Move a single job
+     */
+    private void moveJobFromPreviousVersion(final JobManagerConfiguration configuration,
+            final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager,
+            final Resource jobResource)
+    throws PersistenceException {
+        final ResourceResolver resolver = jobResource.getResourceResolver();
+
+        try {
+            final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+            // check for binary properties
+            Map<String, Object> binaryProperties = new HashMap<String, Object>();
+            final ObjectInputStream ois = vm.get("slingevent:properties", ObjectInputStream.class);
+            if ( ois != null ) {
+                try {
+                    int length = ois.readInt();
+                    for(int i=0;i<length;i++) {
+                        final String key = (String)ois.readObject();
+                        final Object value = ois.readObject();
+                        binaryProperties.put(key, value);
+                    }
+                } catch (final ClassNotFoundException cnfe) {
+                    throw new PersistenceException("Class not found.", cnfe);
+                } catch (final java.io.InvalidClassException ice) {
+                    throw new PersistenceException("Invalid class.", ice);
+                } catch (final IOException ioe) {
+                    throw new PersistenceException("Unable to deserialize job properties.", ioe);
+                } finally {
+                    try {
+                        ois.close();
+                    } catch (final IOException ioe) {
+                        throw new PersistenceException("Unable to deserialize job properties.", ioe);
+                    }
+                }
+            }
+
+            final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+
+            properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
+            final String topic = (String)properties.remove("slingevent:topic");
+            properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic);
+
+            properties.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+            properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+            // and binary properties
+            properties.putAll(binaryProperties);
+            properties.remove("slingevent:properties");
+
+            if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) {
+                properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue
+            }
+            if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) {
+                properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
+            }
+
+            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
+            String targetId = null;
+            if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+                final QueueInfo info = queueManager.getQueueInfo(topic);
+                logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
+                // if queue is configured to drop, we drop
+                if ( info.queueConfiguration.getType() ==  QueueConfiguration.Type.DROP) {
+                    resolver.delete(jobResource);
+                    resolver.commit();
+                    return;
+                }
+                if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+                    targetId = caps.detectTarget(topic, vm, info);
+                    if ( targetId != null ) {
+                        properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                        properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                        properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
+                    }
+                }
+            }
+
+            properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID);
+            properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
+
+            final String jobId = configuration.getUniqueId(topic);
+            properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
+            properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+            final String newPath = configuration.getUniquePath(targetId, topic, jobId, vm);
+            this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath);
+
+            ResourceHelper.getOrCreateResource(resolver, newPath, properties);
+            resolver.delete(jobResource);
+            resolver.commit();
+        } catch (final InstantiationException ie) {
+            throw new PersistenceException("Exception while reading reasource: " + ie.getMessage(), ie.getCause());
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain