You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/22 13:42:55 UTC

svn commit: r1470462 [3/7] - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/dea/ src/main/java/org/apache/sling/event/impl/jobs/ sr...

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.QuerySyntaxException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.JobUtil.JobPriority;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintenance task...
+ *
+ * In the default configuration, this task runs every minute
+ */
+public class MaintenanceTask {
+
+    /** Logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Resource resolver factory. */
+    private final ResourceResolverFactory resourceResolverFactory;
+
+    /** Job manager configuration. */
+    private final JobManagerConfiguration configuration;
+
+    /** Change count for queue configurations .*/
+    private volatile long queueConfigChangeCount = -1;
+
+    /** Change count for topology changes .*/
+    private volatile long topologyChangeCount = -1;
+
+    private boolean checkedForPreviousVersion = false;
+
+    /**
+     * Constructor
+     */
+    public MaintenanceTask(final JobManagerConfiguration config, final ResourceResolverFactory factory) {
+        this.resourceResolverFactory = factory;
+        this.configuration = config;
+    }
+
+    private void reassignJobs(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        if ( caps != null && caps.isLeader() ) {
+            this.logger.debug("Checking for stopped instances...");
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
+                this.logger.debug("Got jobs root {}", jobsRoot);
+
+                // this resource should exist, but we check anyway
+                if ( jobsRoot != null ) {
+                    final Iterator<Resource> instanceIter = jobsRoot.listChildren();
+                    while ( caps.isActive() && instanceIter.hasNext() ) {
+                        final Resource instanceResource = instanceIter.next();
+
+                        final String instanceId = instanceResource.getName();
+                        if ( !caps.isActive(instanceId) ) {
+                            logger.debug("Found stopped instance {}", instanceId);
+                            assignJobs(caps, queueManager, instanceResource, true);
+                        }
+                    }
+                }
+            } catch ( final LoginException le ) {
+                this.ignoreException(le);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * Try to assign unassigned jobs as there might be changes in:
+     * - queue configurations
+     * - topology
+     * - capabilities
+     */
+    private void assignUnassignedJobs(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        if ( caps != null && caps.isLeader() ) {
+            logger.debug("Checking unassigned jobs...");
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
+                logger.debug("Got unassigned root {}", unassignedRoot);
+
+                // this resource should exist, but we check anyway
+                if ( unassignedRoot != null ) {
+                    assignJobs(caps, queueManager, unassignedRoot, false);
+                }
+            } catch ( final LoginException le ) {
+                this.ignoreException(le);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * Try to assign all jobs from the jobs root.
+     * The jobs are stored by topic
+     */
+    private void assignJobs(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager,
+            final Resource jobsRoot,
+            final boolean unassign) {
+        final ResourceResolver resolver = jobsRoot.getResourceResolver();
+
+        final Iterator<Resource> topicIter = jobsRoot.listChildren();
+        while ( caps.isActive() && topicIter.hasNext() ) {
+            final Resource topicResource = topicIter.next();
+
+            final String topicName = topicResource.getName().replace('.', '/');
+            logger.debug("Found topic {}", topicName);
+
+            final String checkTopic;
+            if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
+                checkTopic = "/";
+            } else {
+                checkTopic = topicName;
+            }
+
+            // first check if there is an instance for these topics
+            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
+            if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+                final QueueInfo info = queueManager.getQueueInfo(topicName);
+                logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
+                // if queue is configured to drop, we drop
+                if ( info.queueConfiguration.getType() ==  QueueConfiguration.Type.DROP) {
+                    final Iterator<Resource> i = topicResource.listChildren();
+                    while ( caps.isActive() && i.hasNext() ) {
+                        final Resource rsrc = i.next();
+                        try {
+                            resolver.delete(rsrc);
+                            resolver.commit();
+                        } catch ( final PersistenceException pe ) {
+                            this.ignoreException(pe);
+                            resolver.refresh();
+                        }
+                    }
+                } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+                    // if the queue is not configured to ignore, we can reschedule
+                    for(final Resource yearResource : topicResource.getChildren() ) {
+                        for(final Resource monthResource : yearResource.getChildren() ) {
+                            for(final Resource dayResource : monthResource.getChildren() ) {
+                                for(final Resource hourResource : dayResource.getChildren() ) {
+                                    for(final Resource minuteResource : hourResource.getChildren() ) {
+                                        for(final Resource rsrc : minuteResource.getChildren() ) {
+
+                                            if ( !caps.isActive() ) {
+                                                return;
+                                            }
+
+                                            final ValueMap vm = ResourceUtil.getValueMap(rsrc);
+                                            final String targetId = caps.detectTarget(topicName, vm, info);
+
+                                            if ( targetId != null ) {
+                                                final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                                final Map<String, Object> props = new HashMap<String, Object>(vm);
+                                                props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                                                props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                                                props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                                                try {
+                                                    ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                                    resolver.delete(rsrc);
+                                                    resolver.commit();
+                                                } catch ( final PersistenceException pe ) {
+                                                    this.ignoreException(pe);
+                                                    resolver.refresh();
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            if ( caps.isActive() && unassign ) {
+                // we have to move everything to the unassigned area
+                for(final Resource yearResource : topicResource.getChildren() ) {
+                    for(final Resource monthResource : yearResource.getChildren() ) {
+                        for(final Resource dayResource : monthResource.getChildren() ) {
+                            for(final Resource hourResource : dayResource.getChildren() ) {
+                                for(final Resource minuteResource : hourResource.getChildren() ) {
+                                    for(final Resource rsrc : minuteResource.getChildren() ) {
+
+                                        if ( !caps.isActive() ) {
+                                            return;
+                                        }
+
+                                        final ValueMap vm = ResourceUtil.getValueMap(rsrc);
+                                        final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                        final Map<String, Object> props = new HashMap<String, Object>(vm);
+                                        props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                                        props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                                        try {
+                                            ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                            resolver.delete(rsrc);
+                                            resolver.commit();
+                                        } catch ( final PersistenceException pe ) {
+                                            this.ignoreException(pe);
+                                            resolver.refresh();
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Check if the topology has changed.
+     */
+    private boolean topologyHasChanged(final TopologyCapabilities topologyCapabilities) {
+        boolean topologyChanged = false;
+        if ( topologyCapabilities != null ) {
+            if ( this.topologyChangeCount < topologyCapabilities.getChangeCount() ) {
+                this.topologyChangeCount = topologyCapabilities.getChangeCount();
+                topologyChanged = true;
+            }
+        }
+        return topologyChanged;
+    }
+
+    private boolean queueConfigurationHasChanged(final TopologyCapabilities topologyCapabilities,
+            final QueueConfigurationManager queueManager) {
+        boolean configChanged = false;
+        if ( topologyCapabilities != null ) {
+            final int queueChangeCount = queueManager.getChangeCount();
+            if ( this.queueConfigChangeCount < queueChangeCount ) {
+                configChanged = true;
+                this.queueConfigChangeCount = queueChangeCount;
+            }
+        }
+        return configChanged;
+    }
+
+    /**
+     * One maintenance run
+     */
+    public void run(final TopologyCapabilities topologyCapabilities,
+            final QueueConfigurationManager queueManager,
+            final long cleanUpCounter) {
+        // check topology and config change during each invocation
+        final boolean topologyChanged = this.topologyHasChanged(topologyCapabilities);
+        final boolean configChanged = this.queueConfigurationHasChanged(topologyCapabilities, queueManager);
+
+        // if topology changed, reschedule assigned jobs for stopped instances
+        if ( topologyChanged ) {
+            this.reassignJobs(topologyCapabilities, queueManager);
+        }
+        // try to assign unassigned jobs
+        if ( topologyChanged || configChanged ) {
+            this.assignUnassignedJobs(topologyCapabilities, queueManager);
+        }
+
+        if ( topologyChanged && !this.checkedForPreviousVersion && topologyCapabilities != null && topologyCapabilities.isLeader() ) {
+            this.processJobsFromPreviousVersions(topologyCapabilities, queueManager);
+        }
+
+        // Clean up
+        final String cleanUpAssignedPath;;
+        if ( topologyCapabilities != null && topologyCapabilities.isLeader() ) {
+            cleanUpAssignedPath = this.configuration.getUnassignedJobsPath();
+        } else {
+            cleanUpAssignedPath = null;
+        }
+
+        if ( cleanUpCounter % 60 == 0 ) { // full clean up is done every hour
+            this.fullEmptyFolderCleanup(topologyCapabilities, this.configuration.getLocalJobsPath());
+            if ( cleanUpAssignedPath != null ) {
+                this.fullEmptyFolderCleanup(topologyCapabilities, cleanUpAssignedPath);
+            }
+        } else if ( cleanUpCounter % 5 == 0 ) { // simple clean up every 5 minutes
+            this.simpleEmptyFolderCleanup(topologyCapabilities, this.configuration.getLocalJobsPath());
+            if ( cleanUpAssignedPath != null ) {
+                this.simpleEmptyFolderCleanup(topologyCapabilities, cleanUpAssignedPath);
+            }
+        }
+
+        // lock cleanup is done every 3 minutes
+        if ( cleanUpCounter % 3 == 0 ) {
+            this.lockCleanup(topologyCapabilities);
+        }
+    }
+
+    /**
+     * Clean up the locks
+     * All locks older than three minutes are removed
+     */
+    private void lockCleanup(final TopologyCapabilities caps) {
+        if ( caps != null && caps.isLeader() ) {
+            this.logger.debug("Cleaning up job resource tree: removing obsolete locks");
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                final Calendar startDate = Calendar.getInstance();
+                startDate.add(Calendar.MINUTE, -3);
+
+                final StringBuilder buf = new StringBuilder(64);
+
+                buf.append("//element(*)[@");
+                buf.append(ISO9075.encode(Utility.PROPERTY_LOCK_CREATED));
+                buf.append(" < xs:dateTime('");
+                buf.append(ISO8601.format(startDate));
+                buf.append("')]");
+                final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+                while ( caps.isActive() && result.hasNext() ) {
+                    final Resource lockResource = result.next();
+                    // sanity check for the path
+                    if ( this.configuration.isLock(lockResource.getPath()) ) {
+                        try {
+                            resolver.delete(lockResource);
+                            resolver.commit();
+                        } catch ( final PersistenceException pe) {
+                            this.ignoreException(pe);
+                            resolver.refresh();
+                        }
+                    }
+                }
+            } catch (final QuerySyntaxException qse) {
+                this.ignoreException(qse);
+            } catch (final LoginException le) {
+                this.ignoreException(le);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * Simple empty folder removes empty folders for the last five minutes
+     * from an hour ago!
+     * If folder for minute 59 is removed, we check the hour folder as well.
+     */
+    private void simpleEmptyFolderCleanup(final TopologyCapabilities caps, final String basePath) {
+        this.logger.debug("Cleaning up job resource tree: looking for empty folders");
+        ResourceResolver resolver = null;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+            final Calendar cleanUpDate = Calendar.getInstance();
+            // go back ten minutes
+            cleanUpDate.add(Calendar.HOUR, -1);
+
+            final Resource baseResource = resolver.getResource(basePath);
+            // sanity check - should never be null
+            if ( baseResource != null ) {
+                final Iterator<Resource> topicIter = baseResource.listChildren();
+                while ( caps.isActive() && topicIter.hasNext() ) {
+                    final Resource topicResource = topicIter.next();
+
+                    for(int i = 0; i < 5; i++) {
+                        if ( caps.isActive() ) {
+                            final StringBuilder sb = new StringBuilder(topicResource.getPath());
+                            sb.append('/');
+                            sb.append(cleanUpDate.get(Calendar.YEAR));
+                            sb.append('/');
+                            sb.append(cleanUpDate.get(Calendar.MONTH) + 1);
+                            sb.append('/');
+                            sb.append(cleanUpDate.get(Calendar.DAY_OF_MONTH));
+                            sb.append('/');
+                            sb.append(cleanUpDate.get(Calendar.HOUR_OF_DAY));
+                            final String path = sb.toString();
+
+                            final Resource dateResource = resolver.getResource(path);
+                            if ( dateResource != null && !dateResource.listChildren().hasNext() ) {
+                                resolver.delete(dateResource);
+                                resolver.commit();
+                            }
+                            // check hour folder
+                            if ( path.endsWith("59") ) {
+                                final String hourPath = path.substring(0, path.length() - 3);
+                                final Resource hourResource = resolver.getResource(hourPath);
+                                if ( hourResource != null && !hourResource.listChildren().hasNext() ) {
+                                    resolver.delete(hourResource);
+                                    resolver.commit();
+                                }
+                            }
+                            cleanUpDate.add(Calendar.MINUTE, -1);
+                        }
+                    }
+                }
+            }
+
+        } catch (final PersistenceException pe) {
+            // in the case of an error, we just log this as a warning
+            this.logger.warn("Exception during job resource tree cleanup.", pe);
+        } catch (final LoginException ignore) {
+            this.ignoreException(ignore);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * Full cleanup - this scans all directories!
+     */
+    private void fullEmptyFolderCleanup(final TopologyCapabilities caps, final String basePath) {
+        this.logger.debug("Cleaning up job resource tree: removing ALL empty folders");
+        ResourceResolver resolver = null;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+            final Resource baseResource = resolver.getResource(basePath);
+            // sanity check - should never be null
+            if ( baseResource != null ) {
+                final Calendar now = Calendar.getInstance();
+
+                final Iterator<Resource> topicIter = baseResource.listChildren();
+                while ( caps.isActive() && topicIter.hasNext() ) {
+                    final Resource topicResource = topicIter.next();
+
+                    // now years
+                    final Iterator<Resource> yearIter = topicResource.listChildren();
+                    while ( caps.isActive() && yearIter.hasNext() ) {
+                        final Resource yearResource = yearIter.next();
+                        final int year = Integer.valueOf(yearResource.getName());
+                        final boolean oldYear = year < now.get(Calendar.YEAR);
+
+                        // months
+                        final Iterator<Resource> monthIter = yearResource.listChildren();
+                        while ( caps.isActive() && monthIter.hasNext() ) {
+                            final Resource monthResource = monthIter.next();
+                            final int month = Integer.valueOf(monthResource.getName());
+                            final boolean oldMonth = oldYear || month < (now.get(Calendar.MONTH) + 1);
+
+                            // days
+                            final Iterator<Resource> dayIter = monthResource.listChildren();
+                            while ( caps.isActive() && dayIter.hasNext() ) {
+                                final Resource dayResource = dayIter.next();
+                                final int day = Integer.valueOf(dayResource.getName());
+                                final boolean oldDay = oldMonth || day < now.get(Calendar.DAY_OF_MONTH);
+
+                                // hours
+                                final Iterator<Resource> hourIter = dayResource.listChildren();
+                                while ( caps.isActive() && hourIter.hasNext() ) {
+                                    final Resource hourResource = hourIter.next();
+                                    final int hour = Integer.valueOf(hourResource.getName());
+                                    final boolean oldHour = (oldDay && (oldMonth || now.get(Calendar.HOUR_OF_DAY) > 0)) || hour < (now.get(Calendar.HOUR_OF_DAY) -1);
+
+                                    // we only remove minutes if the hour is old
+                                    if ( oldHour ) {
+                                        final Iterator<Resource> minuteIter = hourResource.listChildren();
+                                        while ( caps.isActive() && minuteIter.hasNext() ) {
+                                            final Resource minuteResource = minuteIter.next();
+
+                                            // check if we can delete the minute
+                                            if ( !minuteResource.listChildren().hasNext() ) {
+                                                resolver.delete(minuteResource);
+                                                resolver.commit();
+                                            }
+                                        }
+                                    }
+
+                                    // check if we can delete the hour
+                                    if ( caps.isActive() && oldHour && !hourResource.listChildren().hasNext()) {
+                                        resolver.delete(hourResource);
+                                        resolver.commit();
+                                    }
+                                }
+                                // check if we can delete the day
+                                if ( caps.isActive() && oldDay && !dayResource.listChildren().hasNext()) {
+                                    resolver.delete(dayResource);
+                                    resolver.commit();
+                                }
+                            }
+
+                            // check if we can delete the month
+                            if ( caps.isActive() && oldMonth && !monthResource.listChildren().hasNext() ) {
+                                resolver.delete(monthResource);
+                                resolver.commit();
+                            }
+                        }
+
+                        // check if we can delete the year
+                        if ( caps.isActive() && oldYear && !yearResource.listChildren().hasNext() ) {
+                            resolver.delete(yearResource);
+                            resolver.commit();
+                        }
+                    }
+                }
+            }
+
+        } catch (final PersistenceException pe) {
+            // in the case of an error, we just log this as a warning
+            this.logger.warn("Exception during job resource tree cleanup.", pe);
+        } catch (final LoginException ignore) {
+            this.ignoreException(ignore);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+    }
+
+    public void reassignJob(final JobImpl job, final String targetId) {
+        ResourceResolver resolver = null;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
+            if ( jobResource != null ) {
+                final ValueMap vm = ResourceUtil.getValueMap(jobResource);
+                final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+                final Map<String, Object> props = new HashMap<String, Object>(vm);
+                props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                if ( targetId == null ) {
+                    props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                } else {
+                    props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                }
+                props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                try {
+                    ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                    resolver.delete(jobResource);
+                    resolver.commit();
+                } catch ( final PersistenceException pe ) {
+                    this.ignoreException(pe);
+                    resolver.refresh();
+                }
+            }
+        } catch (final LoginException ignore) {
+            this.ignoreException(ignore);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
+     */
+    private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        ResourceResolver resolver = null;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+            this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionAnonPath()));
+            this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionIdentifiedPath()));
+            this.checkedForPreviousVersion = true;
+        } catch ( final PersistenceException pe ) {
+            this.logger.warn("Problems moving jobs from previous version.", pe);
+        } catch ( final LoginException le ) {
+            this.ignoreException(le);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * Recursively find jobs and move them
+     */
+    private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager,
+            final Resource rsrc) throws PersistenceException {
+        if ( rsrc != null && caps.isActive() ) {
+            if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
+                this.moveJobFromPreviousVersion(caps, queueManager, rsrc);
+            } else {
+                for(final Resource child : rsrc.getChildren()) {
+                    this.processJobsFromPreviousVersions(caps, queueManager, child);
+                }
+                if ( caps.isActive() ) {
+                    rsrc.getResourceResolver().delete(rsrc);
+                    rsrc.getResourceResolver().commit();
+                    rsrc.getResourceResolver().refresh();
+                }
+            }
+        }
+    }
+
+    /**
+     * Move a single job
+     */
+    private void moveJobFromPreviousVersion(final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager,
+            final Resource jobResource)
+    throws PersistenceException {
+        final ResourceResolver resolver = jobResource.getResourceResolver();
+
+        final ValueMap vm = ResourceUtil.getValueMap(jobResource);
+        final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+
+        properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
+        final String topic = (String)properties.remove("slingevent:topic");
+        properties.put(JobUtil.PROPERTY_JOB_TOPIC, topic);
+
+        properties.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+        properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+
+        if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) {
+            properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue
+        }
+        if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) {
+            properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
+        }
+        properties.put(Job.PROPERTY_JOB_PRIORITY, JobPriority.NORM.name());
+
+        final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
+        String targetId = null;
+        if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+            final QueueInfo info = queueManager.getQueueInfo(topic);
+            logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
+            // if queue is configured to drop, we drop
+            if ( info.queueConfiguration.getType() ==  QueueConfiguration.Type.DROP) {
+                resolver.delete(jobResource);
+                resolver.commit();
+                return;
+            }
+            if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+                targetId = caps.detectTarget(topic, vm, info);
+                if ( targetId != null ) {
+                    properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                    properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                    properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
+                    properties.put(Job.PROPERTY_JOB_PRIORITY, info.queueConfiguration.getPriority().name());
+                }
+            }
+        }
+
+        properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID);
+        properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
+
+        final String jobId = this.configuration.getUniqueId(topic);
+        properties.put(JobUtil.JOB_ID, jobId);
+        properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+        final String newPath = this.configuration.getUniquePath(targetId, topic, jobId, vm);
+        this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath);
+
+        ResourceHelper.getOrCreateResource(resolver, newPath, properties);
+        resolver.delete(jobResource);
+        resolver.commit();
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The capabilities of a topology.
+ */
+public class TopologyCapabilities {
+
+    /** Logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Map: key: topic, value: sling IDs */
+    private final Map<String, List<InstanceDescription>> instanceCapabilities;
+
+    /** Round robin map. */
+    private final Map<String, Integer> roundRobinMap = new HashMap<String, Integer>();
+
+    /** Is this the leader of the cluster? */
+    private final boolean isLeader;
+
+    /** Is this still active? */
+    private volatile boolean active = true;
+
+    /** Change count. */
+    private final long changeCount;
+
+    /** All instances. */
+    private final Map<String, String> allInstances;
+
+    private static final class InstanceDescriptionComparator implements Comparator<InstanceDescription> {
+
+        @Override
+        public int compare(final InstanceDescription o1, final InstanceDescription o2) {
+            if ( o1.getSlingId().equals(o2.getSlingId()) ) {
+                return 0;
+            }
+            if ( o1.isLeader() ) {
+                return -1;
+            } else if ( o2.isLeader() ) {
+                return 1;
+            }
+            return o1.getSlingId().compareTo(o2.getSlingId());
+        }
+    }
+    private static final InstanceDescriptionComparator COMPARATOR = new InstanceDescriptionComparator();
+
+    public static Map<String, String> getAllInstancesMap(final TopologyView view) {
+        final Map<String, String> allInstances = new TreeMap<String, String>();
+
+        for(final InstanceDescription desc : view.getInstances() ) {
+            final String topics = desc.getProperty(JobConsumer.PROPERTY_TOPICS);
+            if ( topics != null && topics.length() > 0 ) {
+                allInstances.put(desc.getSlingId(), topics);
+            } else {
+                allInstances.put(desc.getSlingId(), "");
+            }
+        }
+        return allInstances;
+    }
+
+    public TopologyCapabilities(final TopologyView view, final long changeCount) {
+        this.changeCount = changeCount;
+        this.isLeader = view.getLocalInstance().isLeader();
+        this.allInstances = getAllInstancesMap(view);
+        final Map<String, List<InstanceDescription>> newCaps = new HashMap<String, List<InstanceDescription>>();
+        for(final InstanceDescription desc : view.getInstances() ) {
+            final String topics = desc.getProperty(JobConsumer.PROPERTY_TOPICS);
+            if ( topics != null && topics.length() > 0 ) {
+                this.logger.debug("Capabilities of {} : {}", desc.getSlingId(), topics);
+                for(final String topic : topics.split(",") ) {
+                    List<InstanceDescription> list = newCaps.get(topic);
+                    if ( list == null ) {
+                        list = new ArrayList<InstanceDescription>();
+                        newCaps.put(topic, list);
+                    }
+                    list.add(desc);
+                    Collections.sort(list, COMPARATOR);
+                }
+            }
+        }
+        this.instanceCapabilities = newCaps;
+    }
+
+    public boolean isSame(final Map<String, String> newAllInstancesMap) {
+        return this.allInstances.equals(newAllInstancesMap);
+    }
+
+    public void deactivate() {
+        this.active = false;
+    }
+
+    public boolean isActive() {
+        return this.active;
+    }
+
+    public long getChangeCount() {
+        return this.changeCount;
+    }
+
+    public boolean isActive(final String instanceId) {
+        return this.allInstances.containsKey(instanceId);
+    }
+    /**
+     * Is the current instance the leader?
+     */
+    public boolean isLeader() {
+        return this.isLeader;
+    }
+
+    /**
+     * Return the potential targets (Sling IDs) sorted by ID
+     */
+    public List<InstanceDescription> getPotentialTargets(final String jobTopic, final Map<String, Object> jobProperties) {
+        // calculate potential targets
+        final List<InstanceDescription> potentialTargets = new ArrayList<InstanceDescription>();
+
+        // first: topic targets - directly handling the topic
+        final List<InstanceDescription> topicTargets = this.instanceCapabilities.get(jobTopic);
+        if ( topicTargets != null ) {
+            potentialTargets.addAll(topicTargets);
+        }
+        // second: category targets - handling the topic category
+        final int pos = jobTopic.lastIndexOf('/');
+        if ( pos > 0 ) {
+            final String category = jobTopic.substring(0, pos + 1).concat("*");
+            final List<InstanceDescription> categoryTargets = this.instanceCapabilities.get(category);
+            if ( categoryTargets != null ) {
+                potentialTargets.addAll(categoryTargets);
+            }
+        }
+        // third: bridged consumers
+        final List<InstanceDescription> bridgedTargets = (jobProperties != null && jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) ? this.instanceCapabilities.get("/") : null);
+        if ( bridgedTargets != null ) {
+            potentialTargets.addAll(bridgedTargets);
+        }
+        Collections.sort(potentialTargets, COMPARATOR);
+
+        return potentialTargets;
+    }
+
+    /**
+     * Detect the target instance.
+     */
+    public String detectTarget(final String jobTopic, final Map<String, Object> jobProperties,
+            final QueueInfo queueInfo) {
+        final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic, jobProperties);
+
+        if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+            if ( queueInfo.queueConfiguration.getType() == QueueConfiguration.Type.ORDERED ) {
+                // for ordered queues we always pick the first as we have to pick the same target
+                // on all instances (TODO - we could try to do some round robin of the whole queue)
+                return potentialTargets.get(0).getSlingId();
+            }
+            // TODO - this is a simple round robin which is not based on the actual load
+            //        of the instances
+            Integer index = this.roundRobinMap.get(jobTopic);
+            if ( index == null ) {
+                index = 0;
+            }
+            if ( index >= potentialTargets.size() ) {
+                index = 0;
+            }
+            this.roundRobinMap.put(jobTopic, index + 1);
+            return potentialTargets.get(index).getSlingId();
+        }
+
+        return null;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Apr 22 11:42:53 2013
@@ -18,16 +18,14 @@
  */
 package org.apache.sling.event.impl.jobs;
 
-import java.io.UnsupportedEncodingException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.BitSet;
 import java.util.Calendar;
 import java.util.Dictionary;
+import java.util.HashMap;
 import java.util.Hashtable;
+import java.util.Map;
 
-import org.apache.sling.event.impl.EnvironmentComponent;
-import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobUtil;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -35,174 +33,196 @@ import org.osgi.service.event.EventConst
 
 public abstract class Utility {
 
-    /** Allowed characters for a node name */
-    private static final BitSet ALLOWED_CHARS;
+    public static final String PROPERTY_LOCK_CREATED = "lock.created";
+    public static final String PROPERTY_LOCK_CREATED_APP = "lock.created.app";
 
-    /** Replacement characters for unallowed characters in a node name */
-    private static final char REPLACEMENT_CHAR = '_';
-
-    // Prepare the ALLOWED_CHARS bitset with bits indicating the unicode
-    // character index of allowed characters. We deliberately only support
-    // a subset of the actually allowed set of characters for nodes ...
-    static {
-        final String allowed = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+#!?$%&()=";
-        final BitSet allowedSet = new BitSet();
-        for (int i = 0; i < allowed.length(); i++) {
-            allowedSet.set(allowed.charAt(i));
+    /**
+     * Check the job topic.
+     * @return <code>null</code> if the topic is correct, otherwise an error description is returned
+     */
+    public static String checkJobTopic(final Object jobTopic) {
+        final String message;
+        if ( jobTopic != null ) {
+            if ( jobTopic instanceof String ) {
+                boolean topicIsCorrect = false;
+                try {
+                    new Event((String)jobTopic, (Dictionary<String, Object>)null);
+                    topicIsCorrect = true;
+                } catch (final IllegalArgumentException iae) {
+                    // we just have to catch it
+                }
+                if ( !topicIsCorrect ) {
+                    message = "Discarding job : job has an illegal job topic";
+                } else {
+                    message = null;
+                }
+            } else {
+                message = "Discarding job : job topic is not of type string";
+            }
+        } else {
+            message = "Discarding job : job topic is missing";
         }
-        ALLOWED_CHARS = allowedSet;
+        return message;
     }
 
+    /** Event property containing the time for job start and job finished events. */
+    public static final String PROPERTY_TIME = "time";
+
     /**
-     * Filter the node name for not allowed characters and replace them.
-     * @param nodeName The suggested node name.
-     * @return The filtered node name.
+     * Helper method for sending the notification events.
      */
-    public static String filter(final String nodeName) {
-        final StringBuilder sb  = new StringBuilder(nodeName.length());
-        char lastAdded = 0;
-
-        for(int i=0; i < nodeName.length(); i++) {
-            final char c = nodeName.charAt(i);
-            char toAdd = c;
-
-            if (!ALLOWED_CHARS.get(c)) {
-                if (lastAdded == REPLACEMENT_CHAR) {
-                    // do not add several _ in a row
-                    continue;
-                }
-                toAdd = REPLACEMENT_CHAR;
-
-            } else if(i == 0 && Character.isDigit(c)) {
-                sb.append(REPLACEMENT_CHAR);
+    public static void sendNotification(final EventAdmin eventAdmin,
+            final String eventTopic,
+            final String jobTopic,
+            final String jobName,
+            final Map<String, Object> jobProperties,
+            final Long time) {
+        if ( eventAdmin != null ) {
+            // create job object
+            final Map<String, Object> jobProps;
+            if ( jobProperties == null ) {
+                jobProps = new HashMap<String, Object>();
+            } else {
+                jobProps = jobProperties;
             }
-
-            sb.append(toAdd);
-            lastAdded = toAdd;
+            final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
+            sendNotificationInternal(eventAdmin, eventTopic, job, time);
         }
+    }
 
-        if (sb.length()==0) {
-            sb.append(REPLACEMENT_CHAR);
+    /**
+     * Helper method for sending the notification events.
+     */
+    public static void sendNotification(final EventAdmin eventAdmin,
+            final String eventTopic,
+            final Job job,
+            final Long time) {
+        if ( eventAdmin != null ) {
+            // create new copy of job object
+            final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
+            sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
         }
-
-        return sb.toString();
     }
 
     /**
-     * used for the md5
+     * Helper method for sending the notification events.
      */
-    private static final char[] HEX_TABLE = "0123456789abcdef".toCharArray();
+    private static void sendNotificationInternal(final EventAdmin eventAdmin,
+            final String eventTopic,
+            final Job job,
+            final Long time) {
+        final Dictionary<String, Object> eventProps = new Hashtable<String, Object>();
+        // add basic job properties
+        eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
+        eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
+        if ( job.getName() != null ) {
+            eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
+        }
+        // copy paylod
+        for(final String name : job.getPropertyNames()) {
+            eventProps.put(name, job.getProperty(name));
+        }
+        // add timestamp
+        eventProps.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
+        // add internal time information
+        if ( time != null ) {
+            eventProps.put(PROPERTY_TIME, time);
+        }
+        // make distributable
+        eventProps.put(EventUtil.PROPERTY_DISTRIBUTE, "true");
+        // compatibility:
+        eventProps.put(JobUtil.PROPERTY_NOTIFICATION_JOB, toEvent(job));
+        eventAdmin.postEvent(new Event(eventTopic, eventProps));
+    }
 
     /**
-     * Calculate an MD5 hash of the string given using 'utf-8' encoding.
-     *
-     * @param data the data to encode
-     * @return a hex encoded string of the md5 digested input
-     */
-    public static String md5(String data) {
-        try {
-            return digest("MD5", data.getBytes("utf-8"));
-        } catch (NoSuchAlgorithmException e) {
-            throw new InternalError("MD5 digest not available???");
-        } catch (UnsupportedEncodingException e) {
-            throw new InternalError("UTF8 digest not available???");
-        }
-    }
-
-    /**
-     * Digest the plain string using the given algorithm.
-     *
-     * @param algorithm The alogrithm for the digest. This algorithm must be
-     *                  supported by the MessageDigest class.
-     * @param data      the data to digest with the given algorithm
-     * @return The digested plain text String represented as Hex digits.
-     * @throws java.security.NoSuchAlgorithmException if the desired algorithm is not supported by
-     *                                  the MessageDigest class.
-     */
-    private static String digest(String algorithm, byte[] data)
-    throws NoSuchAlgorithmException {
-        MessageDigest md = MessageDigest.getInstance(algorithm);
-        byte[] digest = md.digest(data);
-        StringBuilder res = new StringBuilder(digest.length * 2);
-        for (int i = 0; i < digest.length; i++) {
-            byte b = digest[i];
-            res.append(HEX_TABLE[(b >> 4) & 15]);
-            res.append(HEX_TABLE[b & 15]);
-        }
-        return res.toString();
-    }
-
-    /** Counter for jobs without an id.  We don't need to sync the access. */
-    private static long JOB_COUNTER = 0;
-
-    /**
-     * Create a unique node path (folder and name) for the job.
-     */
-    public static String getUniquePath(final String jobTopic, final String jobId) {
-        final String convTopic = jobTopic.replace('/', '.');
-        if ( jobId != null ) {
-            final StringBuilder sb = new StringBuilder("identified/");
-            sb.append(convTopic);
-            sb.append('/');
-            // we create an md from the job id - we use the first 6 bytes to
-            // create sub directories
-            final String md5 = md5(jobId);
-            sb.append(md5.charAt(0));
-            sb.append(md5.charAt(1));
-            sb.append(md5.charAt(2));
-            sb.append('/');
-            sb.append(md5.charAt(3));
-            sb.append(md5.charAt(4));
-            sb.append(md5.charAt(5));
-            sb.append('/');
-            sb.append(filter(jobId));
-            return sb.toString();
-        }
-        final Calendar now = Calendar.getInstance();
-        // create a time based path together with the Sling ID
-        final StringBuilder sb = getAnonPath(now);
-        sb.append('/');
-        sb.append(convTopic);
-        sb.append('_');
-        sb.append(JOB_COUNTER);
-        JOB_COUNTER++;
-        return sb.toString();
+     * Create an event from a job
+     * @param job The job
+     * @return New event object.
+     */
+    public static Event toEvent(final Job job) {
+        final Map<String, Object> eventProps = new HashMap<String, Object>();
+        eventProps.putAll(((JobImpl)job).getProperties());
+        if ( job.getName() != null ) {
+            eventProps.put(JobUtil.PROPERTY_JOB_NAME, job.getName());
+        }
+        eventProps.put(JobUtil.JOB_ID, job.getId());
+        return new Event(job.getTopic(), eventProps);
     }
 
-    public static StringBuilder getAnonPath(final Calendar now) {
-        final StringBuilder sb = new StringBuilder("anon/");
-        // create a time based path together with the Sling ID
-        sb.append(Environment.APPLICATION_ID);
-        sb.append('/');
-        sb.append(now.get(Calendar.YEAR));
-        sb.append('/');
-        sb.append(now.get(Calendar.MONTH) + 1);
-        sb.append('/');
-        sb.append(now.get(Calendar.DAY_OF_MONTH));
-        sb.append('/');
-        sb.append(now.get(Calendar.HOUR_OF_DAY));
-        sb.append('/');
-        sb.append(now.get(Calendar.MINUTE));
-        return sb;
+    /**
+     * Append properties to the string builder
+     */
+    private static void appendProperties(final StringBuilder sb,
+            final Map<String, Object> properties) {
+        if ( properties != null ) {
+            sb.append(", properties=");
+            boolean first = true;
+            for(final String propName : properties.keySet()) {
+                if ( propName.equals(JobUtil.JOB_ID)
+                    || propName.equals(JobUtil.PROPERTY_JOB_NAME)
+                    || propName.equals(JobUtil.PROPERTY_JOB_TOPIC) ) {
+                   continue;
+                }
+                if ( first ) {
+                    first = false;
+                } else {
+                    sb.append(",");
+                }
+                sb.append(propName);
+                sb.append('=');
+                final Object value = properties.get(propName);
+                // the toString() method of Calendar is very verbose
+                // therefore we do a toString for these objects based
+                // on a date
+                if ( value instanceof Calendar ) {
+                    sb.append(value.getClass().getName());
+                    sb.append('(');
+                    sb.append(((Calendar)value).getTime());
+                    sb.append(')');
+                } else {
+                    sb.append(value);
+                }
+            }
+        }
+
     }
+    /**
+     * Improved toString method for a job.
+     * This method prints out the job topic and all of the properties.
+     */
+    public static String toString(final String jobTopic,
+            final String name,
+            final Map<String, Object> properties) {
+        final StringBuilder sb = new StringBuilder("Sling Job ");
+        sb.append("[topic=");
+        sb.append(jobTopic);
+        if ( name != null ) {
+            sb.append(", name=");
+            sb.append(name);
+        }
+        appendProperties(sb, properties);
 
-    /** Event property containing the time for job start and job finished events. */
-    public static final String PROPERTY_TIME = "time";
+        sb.append("]");
+        return sb.toString();
+    }
 
     /**
-     * Helper method for sending the notification events.
+     * Improved toString method for a job.
+     * This method prints out the job topic and all of the properties.
      */
-    public static void sendNotification(final EnvironmentComponent environment,
-            final String topic,
-            final Event job,
-            final Long time) {
-        final EventAdmin localEA = environment.getEventAdmin();
-        final Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put(JobUtil.PROPERTY_NOTIFICATION_JOB, job);
-        props.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
-        if ( time != null ) {
-            props.put(PROPERTY_TIME, time);
+    public static String toString(final Job job) {
+        final StringBuilder sb = new StringBuilder("Sling Job ");
+        sb.append("[topic=");
+        sb.append(job.getTopic());
+        sb.append(", id=");
+        sb.append(job.getId());
+        if ( job.getName() != null ) {
+            sb.append(", name=");
+            sb.append(job.getName());
         }
-        localEA.postEvent(new Event(topic, props));
+        appendProperties(sb, ((JobImpl)job).getProperties());
+        sb.append("]");
+        return sb.toString();
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Mon Apr 22 11:42:53 2013
@@ -27,7 +27,6 @@ public abstract class ConfigurationConst
 
     public static final String DEFAULT_TYPE = "UNORDERED";
     public static final String DEFAULT_PRIORITY = "NORM";
-    public static final boolean DEFAULT_RUN_LOCAL = false;
     public static final int DEFAULT_RETRIES = 10;
     public static final long DEFAULT_RETRY_DELAY = 2000;
     public static final int DEFAULT_MAX_PARALLEL = 15;
@@ -39,6 +38,4 @@ public abstract class ConfigurationConst
     public static final String PROP_RETRIES = "queue.retries";
     public static final String PROP_RETRY_DELAY = "queue.retrydelay";
     public static final String PROP_PRIORITY = "queue.priority";
-    public static final String PROP_RUN_LOCAL = "queue.runlocal";
-    public static final String PROP_APP_IDS = "queue.applicationids";
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Apr 22 11:42:53 2013
@@ -30,18 +30,16 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.PropertyUnbounded;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.event.EventUtil;
-import org.apache.sling.event.impl.jobs.JobEvent;
-import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.TopicMatcher;
+import org.apache.sling.event.impl.support.TopicMatcherHelper;
 import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.osgi.framework.Constants;
-import org.osgi.service.event.Event;
 
 @Component(metatype=true,name="org.apache.sling.event.jobs.QueueConfiguration",
         label="%queue.name", description="%queue.description",
         configurationFactory=true,policy=ConfigurationPolicy.REQUIRE)
-@Service(value=InternalQueueConfiguration.class)
+@Service(value={InternalQueueConfiguration.class})
 @Properties({
     @Property(name=ConfigurationConstants.PROP_NAME),
     @Property(name=ConfigurationConstants.PROP_TYPE,
@@ -49,7 +47,8 @@ import org.osgi.service.event.Event;
             options={@PropertyOption(name="UNORDERED",value="Parallel"),
                      @PropertyOption(name="ORDERED",value="Ordered"),
                      @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin"),
-                     @PropertyOption(name="IGNORE",value="Ignore")}),
+                     @PropertyOption(name="PULL",value="Equal Distribution"),
+                     @PropertyOption(name="DROP",value="Drop")}),
     @Property(name=ConfigurationConstants.PROP_TOPICS,
             unbounded=PropertyUnbounded.ARRAY),
     @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
@@ -62,11 +61,7 @@ import org.osgi.service.event.Event;
             value=ConfigurationConstants.DEFAULT_PRIORITY,
             options={@PropertyOption(name="NORM",value="Norm"),
                      @PropertyOption(name="MIN",value="Min"),
-                     @PropertyOption(name="MAX",value="Max")}),
-    @Property(name=ConfigurationConstants.PROP_RUN_LOCAL,
-            boolValue=ConfigurationConstants.DEFAULT_RUN_LOCAL),
-    @Property(name=ConfigurationConstants.PROP_APP_IDS,
-            unbounded=PropertyUnbounded.ARRAY)
+                     @PropertyOption(name="MAX",value="Max")})
 })
 public class InternalQueueConfiguration
     implements QueueConfiguration {
@@ -83,23 +78,17 @@ public class InternalQueueConfiguration
     /** Retry delay. */
     private long retryDelay;
 
-    /** Local queue? */
-    private boolean runLocal;
-
     /** Thread priority. */
     private JobUtil.JobPriority priority;
 
     /** The maximum number of parallel processes (for non ordered queues) */
     private int maxParallelProcesses;
 
-    /** Optional application ids where this queue is running on. */
-    private String[] applicationIds;
-
     /** The ordering. */
     private int serviceRanking;
 
     /** The matchers for topics. */
-    private Matcher[] matchers;
+    private TopicMatcher[] matchers;
 
     /** The configured topics. */
     private String[] topics;
@@ -130,43 +119,15 @@ public class InternalQueueConfiguration
         this.name = PropertiesUtil.toString(params.get(ConfigurationConstants.PROP_NAME), null);
         this.priority = JobUtil.JobPriority.valueOf(PropertiesUtil.toString(params.get(ConfigurationConstants.PROP_PRIORITY), ConfigurationConstants.DEFAULT_PRIORITY));
         this.type = Type.valueOf(PropertiesUtil.toString(params.get(ConfigurationConstants.PROP_TYPE), ConfigurationConstants.DEFAULT_TYPE));
-        this.runLocal = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_RUN_LOCAL), ConfigurationConstants.DEFAULT_RUN_LOCAL);
         this.retries = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_RETRIES), ConfigurationConstants.DEFAULT_RETRIES);
         this.retryDelay = PropertiesUtil.toLong(params.get(ConfigurationConstants.PROP_RETRY_DELAY), ConfigurationConstants.DEFAULT_RETRY_DELAY);
         final int maxParallel = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_MAX_PARALLEL), ConfigurationConstants.DEFAULT_MAX_PARALLEL);
         this.maxParallelProcesses = (maxParallel == -1 ? ConfigurationConstants.NUMBER_OF_PROCESSORS : maxParallel);
-        final String appIds[] = PropertiesUtil.toStringArray(params.get(ConfigurationConstants.PROP_APP_IDS));
-        if ( appIds == null
-             || appIds.length == 0
-             || (appIds.length == 1 && (appIds[0] == null || appIds[0].length() == 0)) ) {
-            this.applicationIds = null;
-        } else {
-            this.applicationIds = appIds;
-        }
         final String[] topicsParam = PropertiesUtil.toStringArray(params.get(ConfigurationConstants.PROP_TOPICS));
-        if ( topicsParam == null
-             || topicsParam.length == 0
-             || (topicsParam.length == 1 && (topicsParam[0] == null || topicsParam[0].length() == 0))) {
-            matchers = null;
+        this.matchers = TopicMatcherHelper.buildMatchers(topicsParam);
+        if ( this.matchers == null ) {
             this.topics = null;
         } else {
-            final Matcher[] newMatchers = new Matcher[topicsParam.length];
-            for(int i=0; i < topicsParam.length; i++) {
-                String value = topicsParam[i];
-                if ( value != null ) {
-                    value = value.trim();
-                }
-                if ( value != null && value.length() > 0 ) {
-                    if ( value.endsWith(".") ) {
-                        newMatchers[i] = new PackageMatcher(value);
-                    } else if ( value.endsWith("*") ) {
-                        newMatchers[i] = new SubPackageMatcher(value);
-                    } else {
-                        newMatchers[i] = new ClassMatcher(value);
-                    }
-                }
-            }
-            matchers = newMatchers;
             this.topics = topicsParam;
         }
         this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
@@ -174,63 +135,6 @@ public class InternalQueueConfiguration
         this.valid = this.checkIsValid();
     }
 
-    public InternalQueueConfiguration(final Event jobEvent) {
-        this.name = (String)jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
-        if ( jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_ORDERED) != null ) {
-            this.type = Type.ORDERED;
-            this.maxParallelProcesses = 1;
-        } else {
-            this.type = Type.UNORDERED;
-            int maxPar = ConfigurationConstants.DEFAULT_MAX_PARALLEL;
-            final Object value = jobEvent.getProperty(JobUtil.PROPERTY_JOB_PARALLEL);
-            if ( value != null ) {
-                if ( value instanceof Boolean ) {
-                    final boolean result = ((Boolean)value).booleanValue();
-                    if ( !result ) {
-                        maxPar = 1;
-                    }
-                } else if ( value instanceof Number ) {
-                    final int result = ((Number)value).intValue();
-                    if ( result > 1 ) {
-                        maxPar = result;
-                    } else {
-                        maxPar = 1;
-                    }
-                } else {
-                    final String strValue = value.toString();
-                    if ( "no".equalsIgnoreCase(strValue) || "false".equalsIgnoreCase(strValue) ) {
-                        maxPar = 1;
-                    } else {
-                        // check if this is a number
-                        try {
-                            final int result = Integer.valueOf(strValue).intValue();
-                            if ( result > 1 ) {
-                                maxPar = result;
-                            } else {
-                                maxPar = 1;
-                            }
-                        } catch (NumberFormatException ne) {
-                            // we ignore this
-                        }
-                    }
-                }
-            }
-            if ( maxPar == -1 ) {
-                maxPar = ConfigurationConstants.NUMBER_OF_PROCESSORS;
-            }
-            this.maxParallelProcesses = maxPar;
-        }
-        this.priority = JobUtil.JobPriority.valueOf(ConfigurationConstants.DEFAULT_PRIORITY);
-        this.runLocal = false;
-        this.retries = ConfigurationConstants.DEFAULT_RETRIES;
-        this.retryDelay = ConfigurationConstants.DEFAULT_RETRY_DELAY;
-        this.serviceRanking = 0;
-        this.applicationIds = null;
-        this.matchers = null;
-        this.topics = new String[] {"<Custom:" + jobEvent.getProperty(JobUtil.PROPERTY_JOB_TOPIC) + ">"};
-        this.valid = true;
-    }
-
     /**
      * Check if this configuration is valid,
      * If it is invalid, it is ignored.
@@ -238,7 +142,7 @@ public class InternalQueueConfiguration
     private boolean checkIsValid() {
         boolean hasMatchers = false;
         if ( this.matchers != null ) {
-            for(final Matcher m : this.matchers ) {
+            for(final TopicMatcher m : this.matchers ) {
                 if ( m != null ) {
                     hasMatchers = true;
                 }
@@ -267,22 +171,21 @@ public class InternalQueueConfiguration
 
     /**
      * Check if the queue processes the event.
-     * @param event The event
+     * @param topic The topic of the event
+     * @return The queue name or <code>null</code>
      */
-    public boolean match(final JobEvent event) {
-        final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+    public String match(final String topic) {
         if ( this.matchers != null ) {
-            for(final Matcher m : this.matchers ) {
+            for(final TopicMatcher m : this.matchers ) {
                 if ( m != null ) {
                     final String rep = m.match(topic);
                     if ( rep != null ) {
-                        event.queueName = this.name.replace("{0}", rep);
-                        return true;
+                        return this.name.replace("{0}", rep);
                     }
                 }
             }
         }
-        return false;
+        return null;
     }
 
     /**
@@ -293,37 +196,9 @@ public class InternalQueueConfiguration
     }
 
     /**
-     * Checks if the event should be skipped.
-     * This can happen if
-     * - the queue is of type ignore
-     * - the queue is bound to some application id
-     * - the event is a local event generated with a different application id
-     */
-    public boolean isSkipped(final JobEvent event) {
-        if ( this.type == Type.IGNORE ) {
-            return true;
-        }
-        if ( this.applicationIds != null ) {
-            boolean found = false;
-            for(final String id : this.applicationIds) {
-                if ( Environment.APPLICATION_ID.equals(id) ) {
-                    found = true;
-                }
-            }
-            if ( !found ) {
-                return true;
-            }
-        }
-        if ( this.runLocal
-             && !event.event.getProperty(EventUtil.PROPERTY_APPLICATION).equals(Environment.APPLICATION_ID) ) {
-            return true;
-        }
-        return false;
-    }
-
-    /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getRetryDelayInMs()
      */
+    @Override
     public long getRetryDelayInMs() {
         return this.retryDelay;
     }
@@ -331,6 +206,7 @@ public class InternalQueueConfiguration
     /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getMaxRetries()
      */
+    @Override
     public int getMaxRetries() {
         return this.retries;
     }
@@ -338,6 +214,7 @@ public class InternalQueueConfiguration
     /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getType()
      */
+    @Override
     public Type getType() {
         return this.type;
     }
@@ -345,6 +222,7 @@ public class InternalQueueConfiguration
     /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getPriority()
      */
+    @Override
     public JobUtil.JobPriority getPriority() {
         return this.priority;
     }
@@ -352,27 +230,21 @@ public class InternalQueueConfiguration
     /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getMaxParallel()
      */
+    @Override
     public int getMaxParallel() {
         return this.maxParallelProcesses;
     }
 
-    /**
-     * @see org.apache.sling.event.jobs.QueueConfiguration#isLocalQueue()
-     */
+    @Override
+    @Deprecated
     public boolean isLocalQueue() {
-        return this.runLocal;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.QueueConfiguration#getApplicationIds()
-     */
-    public String[] getApplicationIds() {
-        return this.applicationIds;
+        return false;
     }
 
     /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getTopics()
      */
+    @Override
     public String[] getTopics() {
         return this.topics;
     }
@@ -380,6 +252,7 @@ public class InternalQueueConfiguration
     /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getRanking()
      */
+    @Override
     public int getRanking() {
         return this.serviceRanking;
     }
@@ -389,6 +262,12 @@ public class InternalQueueConfiguration
     }
 
     @Override
+    @Deprecated
+    public String[] getApplicationIds() {
+        return null;
+    }
+
+    @Override
     public String toString() {
         return "Queue-Configuration(" + this.hashCode() + ") : {" +
             "name=" + this.name +
@@ -397,80 +276,8 @@ public class InternalQueueConfiguration
             ", maxParallelProcesses=" + this.maxParallelProcesses +
             ", retries=" + this.retries +
             ", retryDelayInMs= " + this.retryDelay +
-            ", applicationIds= " + (this.applicationIds == null ? "[]" : Arrays.toString(this.applicationIds)) +
             ", serviceRanking=" + this.serviceRanking +
             ", pid=" + this.pid +
             ", isValid=" + this.isValid() + "}";
     }
-
-    /**
-     * Internal interface for topic matching
-     */
-    private static interface Matcher {
-        /** Check if the topic matches and return the variable part - null if not matching. */
-        String match(String topic);
-    }
-
-    /** Package matcher - the topic must be in the same package. */
-    private static final class PackageMatcher implements Matcher {
-
-        private final String packageName;
-
-        public PackageMatcher(final String name) {
-            // remove last char and maybe a trailing slash
-            int lastPos = name.length() - 1;
-            if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
-                lastPos--;
-            }
-            this.packageName = name.substring(0, lastPos);
-        }
-
-        /**
-         * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
-         */
-        public String match(final String topic) {
-            final int pos = topic.lastIndexOf('/');
-            return pos > -1 && topic.substring(0, pos).equals(packageName) ? topic.substring(pos + 1) : null;
-        }
-    }
-
-    /** Sub package matcher - the topic must be in the same package or a sub package. */
-    private static final class SubPackageMatcher implements Matcher {
-        private final String packageName;
-
-        public SubPackageMatcher(final String name) {
-            // remove last char and maybe a trailing slash
-            int lastPos = name.length() - 1;
-            if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
-                this.packageName = name.substring(0, lastPos);
-            } else {
-                this.packageName = name.substring(0, lastPos) + '/';
-            }
-        }
-
-        /**
-         * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
-         */
-        public String match(final String topic) {
-            final int pos = topic.lastIndexOf('/');
-            return pos > -1 && topic.substring(0, pos + 1).startsWith(this.packageName) ? topic.substring(this.packageName.length()) : null;
-        }
-    }
-
-    /** The topic must match exactly. */
-    private static final class ClassMatcher implements Matcher {
-        private final String className;
-
-        public ClassMatcher(final String name) {
-            this.className = name;
-        }
-
-        /**
-         * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
-         */
-        public String match(String topic) {
-            return this.className.equals(topic) ? "" : null;
-        }
-    }
-
 }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is the configuration for the main queue.
+ *
+ */
+@Component(label="%job.events.name",
+        description="%job.events.description",
+        name="org.apache.sling.event.impl.jobs.DefaultJobManager",
+        metatype=true)
+@Service(value=MainQueueConfiguration.class)
+@Properties({
+    @Property(name=ConfigurationConstants.PROP_PRIORITY,
+            value=ConfigurationConstants.DEFAULT_PRIORITY,
+            options={@PropertyOption(name="NORM",value="Norm"),
+            @PropertyOption(name="MIN",value="Min"),
+            @PropertyOption(name="MAX",value="Max")}),
+    @Property(name=ConfigurationConstants.PROP_RETRIES,
+            intValue=ConfigurationConstants.DEFAULT_RETRIES),
+    @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
+            longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+    @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
+            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
+})
+public class MainQueueConfiguration {
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private InternalQueueConfiguration mainConfiguration;
+
+    /**
+     * Activate this component.
+     * @param props Configuration properties
+     */
+    @Activate
+    protected void activate(final Map<String, Object> props) {
+        this.update(props);
+    }
+
+    /**
+     * Configure this component.
+     * @param props Configuration properties
+     */
+    @Modified
+    protected void update(final Map<String, Object> props) {
+        // create a new dictionary with the missing info and do some sanity puts
+        final Map<String, Object> queueProps = new HashMap<String, Object>(props);
+        queueProps.put(ConfigurationConstants.PROP_TOPICS, "*");
+        queueProps.put(ConfigurationConstants.PROP_NAME, "<main queue>");
+        queueProps.put(ConfigurationConstants.PROP_TYPE, InternalQueueConfiguration.Type.UNORDERED);
+
+        // check max parallel - this should never be lower than 2!
+        final int maxParallel = PropertiesUtil.toInteger(queueProps.get(ConfigurationConstants.PROP_MAX_PARALLEL),
+                ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+        if ( maxParallel < 2 ) {
+            this.logger.debug("Ignoring invalid setting of {} for {}. Setting to minimum value: 2",
+                    maxParallel, ConfigurationConstants.PROP_MAX_PARALLEL);
+            queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 2);
+        }
+        this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
+    }
+
+    public InternalQueueConfiguration getMainConfiguration() {
+        return this.mainConfiguration;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Mon Apr 22 11:42:53 2013
@@ -21,18 +21,17 @@ package org.apache.sling.event.impl.jobs
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.event.impl.jobs.JobEvent;
-import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.event.impl.support.ResourceHelper;
 import org.osgi.framework.BundleContext;
 import org.osgi.util.tracker.ServiceTracker;
 
 
 /**
- * An event handler for special job events.
- *
- * We schedule this event handler to run in the background and clean up
- * obsolete events.
+ * The queue manager manages queue configurations.
  */
 @Component
 @Service(value=QueueConfigurationManager.class)
@@ -47,13 +46,16 @@ public class QueueConfigurationManager {
     /** Tracker count to detect changes. */
     private volatile int lastTrackerCount = -1;
 
+    @Reference
+    private MainQueueConfiguration mainQueueConfiguration;
 
     /**
      * Activate this component.
      * Create the service tracker and start it.
      */
     @Activate
-    protected void activate(final BundleContext bundleContext) {
+    protected void activate(final BundleContext bundleContext)
+    throws LoginException, PersistenceException {
         this.configTracker = new ServiceTracker(bundleContext,
                 InternalQueueConfiguration.class.getName(), null);
         this.configTracker.open();
@@ -101,25 +103,42 @@ public class QueueConfigurationManager {
         return configurations;
     }
 
+    public InternalQueueConfiguration getMainQueueConfiguration() {
+        return this.mainQueueConfiguration.getMainConfiguration();
+    }
+
+    public static final class QueueInfo {
+        public InternalQueueConfiguration queueConfiguration;
+        public String queueName;
+        public String targetId;
+    }
+
     /**
      * Find the queue configuration for the job.
      * This method only returns a configuration if one matches.
      */
-    public InternalQueueConfiguration getQueueConfiguration(final JobEvent event) {
+    public QueueInfo getQueueInfo(final String topic) {
         final InternalQueueConfiguration[] configurations = this.getConfigurations();
-        final String queueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
         for(final InternalQueueConfiguration config : configurations) {
             if ( config.isValid() ) {
-                // check for queue name first
-                if ( queueName != null && queueName.equals(config.getName()) ) {
-                    event.queueName = queueName;
-                    return config;
-                }
-                if ( config.match(event) ) {
-                    return config;
+                final String qn = config.match(topic);
+                if ( qn != null ) {
+                    final QueueInfo result = new QueueInfo();
+                    result.queueConfiguration = config;
+                    result.queueName = ResourceHelper.filterName(qn);
+
+                    return result;
                 }
             }
         }
-        return null;
+        final QueueInfo result = new QueueInfo();
+        result.queueConfiguration = this.mainQueueConfiguration.getMainConfiguration();
+        result.queueName = result.queueConfiguration.getName();
+
+        return result;
+    }
+
+    public int getChangeCount() {
+        return this.configTracker.getTrackingCount();
     }
 }