You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/15 16:02:25 UTC

svn commit: r1022923 [2/2] - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/job/ src/main/java/org/apache/sling/event/impl/jobs/ sr...

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.StatisticsImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.Statistics;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public abstract class AbstractJobQueue
+    extends StatisticsImpl
+    implements JobStatusNotifier, Queue {
+
+    /** Default number of seconds to wait for an ack. */
+    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
+    /** Default timeout for suspend. */
+    private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
+
+    /** The logger. */
+    protected final Logger logger;
+
+    /** Configuration. */
+    protected final InternalQueueConfiguration configuration;
+
+    /** The environment component. */
+    private final EnvironmentComponent environment;
+
+    /** The queue name. */
+    protected volatile String queueName;
+
+    /** Are we still running? */
+    protected volatile boolean running;
+
+    /** Are we marked for cleanup */
+    private volatile boolean markedForCleanUp = false;
+
+    /** Is the queue currently waiting(sleeping) */
+    protected volatile boolean isWaiting = false;
+
+    /** The map of events we're have started (send). */
+    private final Map<String, JobEvent> startedJobsLists = new HashMap<String, JobEvent>();
+
+    /** The map of events we're processing. */
+    private final Map<String, JobEvent> processsingJobsLists = new HashMap<String, JobEvent>();
+
+    /** Suspended since. */
+    private final AtomicLong suspendedSince = new AtomicLong(-1);
+
+    /** Suspend lock. */
+    private final Object suspendLock = new Object();
+
+    /**
+     * Start this queue
+     * @param name The queue name
+     * @param config The queue configuration
+     * @param environment The environment component
+     */
+    public AbstractJobQueue(final String name,
+                    final InternalQueueConfiguration config,
+                    final EnvironmentComponent environment) {
+        this.queueName = name;
+        this.configuration = config;
+        this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+        this.running = true;
+        this.environment = environment;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+     */
+    public String getStateInfo() {
+        return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue();
+    }
+
+    /**
+     * Start the job queue.
+     */
+    public void start() {
+        final Thread queueThread = new Thread(new Runnable() {
+
+            public void run() {
+                while ( running ) {
+                    logger.info("Starting job queue {}", queueName);
+                    logger.debug("Configuration for job queue={}", configuration);
+
+                    try {
+                        runJobQueue();
+                    } catch (Throwable t) { //NOSONAR
+                        logger.error("Job queue " + queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
+                    }
+                }
+            }
+
+        }, "Apache Sling Job Queue " + queueName);
+        queueThread.setDaemon(true);
+        queueThread.start();
+    }
+
+    /**
+     * Return the queue configuration
+     */
+    public InternalQueueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * Close this queue.
+     */
+    public void close() {
+        this.running = false;
+        this.logger.debug("Shutting down job queue {}", queueName);
+        this.logger.debug("Waking up sleeping queue {}", queueName);
+        this.resume();
+        if ( this.isWaiting ) {
+            this.logger.debug("Waking up waiting queue {}", this.queueName);
+            this.notifyFinished(null);
+        }
+        // continue queue processing to stop the queue
+        this.put(new JobEvent(null, null) {
+            public boolean lock() { return false; }
+            public void unlock() {
+                // dummy impl
+            }
+            public void finished() {
+                // dummy impl
+            }
+            public boolean remove() { return true; }
+            public boolean reschedule() { return false; }
+            });
+
+        this.processsingJobsLists.clear();
+        this.startedJobsLists.clear();
+        this.logger.info("Stopped job queue {}", this.queueName);
+    }
+
+    /**
+     * Periodically cleanup.
+     */
+    public void cleanup() {
+        if ( this.running ) {
+            // check for jobs that were started but never got an aknowledge
+            final long tooOld = System.currentTimeMillis() - DEFAULT_WAIT_FOR_ACK_IN_MS;
+            // to keep the synchronized block as fast as possible we just store the
+            // jobs to be removed in a new list and process this list afterwards
+            final List<JobEvent> restartJobs = new ArrayList<JobEvent>();
+            synchronized ( this.startedJobsLists ) {
+                final Iterator<Map.Entry<String, JobEvent>> i = this.startedJobsLists.entrySet().iterator();
+                while ( i.hasNext() ) {
+                    final Map.Entry<String, JobEvent> entry = i.next();
+                    if ( entry.getValue().started <= tooOld ) {
+                        restartJobs.add(entry.getValue());
+                    }
+                }
+            }
+
+            // restart jobs is now a list of potential candidates, we now have to check
+            // each candidate separately again!
+            if ( restartJobs.size() > 0 ) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    // we just ignore this
+                    this.ignoreException(e);
+                }
+            }
+            final Iterator<JobEvent> jobIter = restartJobs.iterator();
+            while ( jobIter.hasNext() ) {
+                final JobEvent info = jobIter.next();
+                boolean process = false;
+                synchronized ( this.startedJobsLists ) {
+                    process = this.startedJobsLists.remove(info.uniqueId) != null;
+                }
+                if ( process ) {
+                    this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId);
+                    this.finishedJob(info.event, true);
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
+     */
+    public boolean sendAcknowledge(final Event job) {
+        final String location = (String)job.getProperty(JobUtil.JOB_ID);
+        final JobEvent ack;
+        synchronized ( this.startedJobsLists ) {
+            ack = this.startedJobsLists.remove(location);
+        }
+        // if the event is still in the processing list, we confirm the ack
+        if ( ack != null ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Received ack for job {}", EventUtil.toString(job));
+            }
+            final long queueTime = ack.started - ack.queued;
+            this.addActive(queueTime);
+            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_STARTED, job, queueTime);
+            synchronized ( this.processsingJobsLists ) {
+                this.processsingJobsLists.put(location, ack);
+            }
+        } else {
+            this.decQueued();
+        }
+        return ack != null;
+    }
+
+    private boolean handleReschedule(final JobEvent jobEvent, final boolean shouldReschedule) {
+        boolean reschedule = shouldReschedule;
+        if ( shouldReschedule ) {
+            // check if we exceeded the number of retries
+            int retries = this.configuration.getMaxRetries();
+            if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES) != null ) {
+                retries = (Integer) jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES);
+            }
+            int retryCount = 0;
+            if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                retryCount = (Integer)jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT);
+            }
+            retryCount++;
+            if ( retries != -1 && retryCount > retries ) {
+                reschedule = false;
+            }
+            if ( reschedule ) {
+                // update event with retry count and retries
+                final Dictionary<String, Object> newProperties = new EventPropertiesMap(jobEvent.event);
+                newProperties.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+                newProperties.put(JobUtil.PROPERTY_JOB_RETRIES, retries);
+                jobEvent.event = new Event(jobEvent.event.getTopic(), newProperties);
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Failed job {}", EventUtil.toString(jobEvent.event));
+                }
+                this.failedJob();
+                jobEvent.queued = System.currentTimeMillis();
+                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FAILED, jobEvent.event, null);
+            } else {
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Cancelled job {}", EventUtil.toString(jobEvent.event));
+                }
+                this.cancelledJob();
+                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.event, null);
+            }
+        } else {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("Finished job {}", EventUtil.toString(jobEvent.event));
+            }
+            final long processingTime = System.currentTimeMillis() - jobEvent.started;
+            this.finishedJob(processingTime);
+            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FINISHED, jobEvent.event, processingTime);
+        }
+
+        return reschedule;
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, boolean)
+     */
+    public boolean finishedJob(final Event job, final boolean shouldReschedule) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
+        }
+        if ( !this.running ) {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("Queue is not running anymore. Discarding finish for {}", EventUtil.toString(job));
+            }
+            return false;
+        }
+        final String location = (String)job.getProperty(JobUtil.JOB_ID);
+        // let's remove the event from our processing list
+        // this is just a sanity check, as usually the job should have been
+        // removed during sendAcknowledge.
+        synchronized ( this.startedJobsLists ) {
+            this.startedJobsLists.remove(location);
+        }
+
+        // get job event
+        final JobEvent info;
+        synchronized ( this.processsingJobsLists ) {
+            info = this.processsingJobsLists.remove(location);
+        }
+        if ( info == null ) {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("This job has never been started by this queue: {}", EventUtil.toString(job));
+            }
+            return false;
+        }
+
+        // handle the reschedule, a new job might be returned with updated reschedule info!
+        final boolean reschedule = this.handleReschedule(info, shouldReschedule);
+
+        // if this is set after the synchronized block we have an error
+        final boolean finishSuccessful;
+
+        if ( !reschedule ) {
+            info.finished();
+            finishSuccessful = true;
+        } else {
+            finishSuccessful = info.reschedule();
+        }
+
+        if ( !finishSuccessful || !reschedule ) {
+            checkForNotify(null);
+            return false;
+        }
+        checkForNotify(info);
+        return true;
+    }
+
+    private void checkForNotify(final JobEvent info) {
+        JobEvent reprocessInfo = null;
+        if ( info != null ) {
+            reprocessInfo = this.reschedule(info);
+        }
+        notifyFinished(reprocessInfo);
+    }
+
+    protected boolean canBeMarkedForCleanUp() {
+        return this.isEmpty() && !this.isWaiting;
+    }
+    /**
+     * Mark this queue for cleanup.
+     */
+    public void markForCleanUp() {
+        if ( this.canBeMarkedForCleanUp() ) {
+            this.markedForCleanUp = true;
+        }
+    }
+
+    /**
+     * Check if this queue is marked for cleanup
+     */
+    public boolean isMarkedForCleanUp() {
+        return this.markedForCleanUp && this.canBeMarkedForCleanUp();
+    }
+
+    /**
+     * Get the name of the job queue.
+     */
+    public String getName() {
+        return this.queueName;
+    }
+
+
+    /**
+     * Add a new job to the queue.
+     */
+    public void process(final JobEvent event) {
+        this.put(event);
+        event.queued = System.currentTimeMillis();
+        this.incQueued();
+    }
+
+    /**
+     * Execute the qeue
+     */
+    private void runJobQueue() {
+        JobEvent info = null;
+        while ( this.running ) {
+            while ( this.suspendedSince.longValue() != -1 ) {
+                synchronized ( this.suspendLock ) {
+                    try {
+                        this.suspendLock.wait(MAX_SUSPEND_TIME);
+                    } catch (final InterruptedException ignore) {
+                        this.ignoreException(ignore);
+                    }
+                    if ( System.currentTimeMillis() > this.suspendedSince.longValue() + MAX_SUSPEND_TIME ) {
+                        this.suspendedSince.set(-1);
+                    }
+                }
+            }
+            if ( info == null ) {
+                // so let's wait/get the next job from the queue
+                info = this.take();
+            }
+
+            if ( info != null && this.running ) {
+                info = this.start(info);
+            }
+        }
+    }
+
+    /**
+     * Process a job
+     */
+    protected boolean executeJob(final JobEvent info) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Executing job {}.", EventUtil.toString(info.event));
+        }
+        if ( info.lock() ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Starting job {}", EventUtil.toString(info.event));
+            }
+            boolean unlock = true;
+            try {
+                final Event jobEvent = this.getJobEvent(info);
+                final EventAdmin localEA = this.environment.getEventAdmin();
+                info.started = System.currentTimeMillis();
+                // let's add the event to our processing list
+                synchronized ( this.startedJobsLists ) {
+                    this.startedJobsLists.put(info.uniqueId, info);
+                }
+
+                // we need async delivery, otherwise we might create a deadlock
+                // as this method runs inside a synchronized block and the finishedJob
+                // method as well!
+                localEA.postEvent(jobEvent);
+                // do not unlock if sending was successful
+                unlock = false;
+
+                return true;
+
+            } catch (final Exception re) {
+                // if an exception occurs, we just log
+                this.logger.error("Exception during job processing.", re);
+            } finally {
+                if ( unlock ) {
+                    info.unlock();
+                }
+            }
+        }
+
+        this.decQueued();
+        return false;
+    }
+
+    /**
+     * Create the real job event.
+     * This generates a new event object with the same properties, but with the
+     * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+     * @param info The job event.
+     * @return The real job event.
+     */
+    private Event getJobEvent(final JobEvent info) {
+        final String eventTopic = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+        final Dictionary<String, Object> properties = new EventPropertiesMap(info.event);
+        // put properties for finished job callback
+        properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME,
+                new JobStatusNotifier.NotifierContext(this));
+        // remove app id and distributable flag
+        properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
+        properties.remove(EventUtil.PROPERTY_APPLICATION);
+
+        // set priority from configuration
+        if ( properties.get(JobUtil.PROPERTY_JOB_PRIORITY) == null ) {
+            properties.put(JobUtil.PROPERTY_JOB_PRIORITY, this.configuration.getPriority());
+        }
+        // set queue name
+        properties.put(JobUtil.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+
+        return new Event(eventTopic, properties);
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    protected void ignoreException(Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Rename this queue.
+     */
+    public void rename(final String name) {
+        this.logger.info("Queue reconfiguration: old queue {} is renamed to {}.", this.queueName, name);
+        this.queueName = name;
+    }
+
+    /**
+     * Reschedule a job.
+     */
+    protected abstract JobEvent reschedule(final JobEvent info);
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStatistics()
+     */
+    public Statistics getStatistics() {
+        return this;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#resume()
+     */
+    public void resume() {
+        if ( this.isSuspended() ) {
+            synchronized ( this.suspendLock ) {
+                this.suspendLock.notify();
+            }
+        }
+        this.suspendedSince.set(-1);
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#suspend()
+     */
+    public void suspend() {
+        this.suspendedSince.compareAndSet(-1, System.currentTimeMillis());
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#isSuspended()
+     */
+    public boolean isSuspended() {
+        return this.suspendedSince.longValue() != -1;
+    }
+
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#removeAll()
+     */
+    public synchronized void removeAll() {
+        // we suspend the queue
+        final boolean wasSuspended = this.isSuspended();
+        this.suspend();
+        // we copy all events and remove them in the background
+        final Collection<JobEvent> events = this.removeAllJobs();
+        this.clearQueued();
+        final Thread t = new Thread(new Runnable() {
+
+                /**
+                 * @see java.lang.Runnable#run()
+                 */
+                public void run() {
+                    for(final JobEvent job : events) {
+                        job.remove();
+                    }
+                }
+            }, "Queue RemoveAll Thread for " + this.queueName);
+        t.setDaemon(true);
+        t.start();
+        // start queue again
+        if ( !wasSuspended ) {
+            this.resume();
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        this.clearQueued();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
+     */
+    public Object getState(final String key) {
+        // not supported for now
+        return null;
+    }
+
+    /**
+     * Put another job into the queue.
+     */
+    protected abstract void put(final JobEvent event);
+
+    /**
+     * Get another job from the queue.
+     */
+    protected abstract JobEvent take();
+
+    /**
+     * Is the queue empty?
+     */
+    protected abstract boolean isEmpty();
+
+    /**
+     * Remove all events from the queue and return them.
+     */
+    protected abstract Collection<JobEvent> removeAllJobs();
+
+    protected abstract JobEvent start(final JobEvent event);
+
+    protected abstract void notifyFinished(final JobEvent rescheduleInfo);
+}
+

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.Date;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * Abstract base class for a parallel processing job queue.
+ */
+public abstract class AbstractParallelJobQueue extends AbstractJobQueue {
+
+    protected volatile int jobCount;
+
+    /** The scheduler for rescheduling. */
+    private final Scheduler scheduler;
+
+    public AbstractParallelJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env,
+                           final Scheduler scheduler) {
+        super(name, config, env);
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public String getStateInfo() {
+        return super.getStateInfo() + ", jobCount=" + this.jobCount;
+    }
+
+    @Override
+    protected JobEvent start(final JobEvent processInfo) {
+        // acquire a slot
+        this.acquireSlot();
+
+        if ( !this.executeJob(processInfo) ) {
+            this.freeSlot();
+        }
+        return null;
+    }
+
+    /**
+     * Acquire a processing slot.
+     * This method is called if the queue is not ordered.
+     */
+    private void acquireSlot() {
+        synchronized ( this ) {
+            if ( jobCount >= this.configuration.getMaxParallel() ) {
+                this.isWaiting = true;
+                this.logger.debug("Job queue {} is processing {} jobs - waiting for a free slot.", this.queueName, jobCount);
+                while ( this.isWaiting ) {
+                    try {
+                        this.wait();
+                    } catch (final InterruptedException e) {
+                        this.ignoreException(e);
+                    }
+                }
+                this.logger.debug("Job queue {} is continuing.", this.queueName);
+            }
+            jobCount++;
+        }
+    }
+
+    /**
+     * Free a slot when a job processing is finished.
+     */
+    private void freeSlot() {
+        synchronized ( this ) {
+            jobCount--;
+            if ( this.isWaiting ) {
+                this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+                this.isWaiting = false;
+                this.notify();
+            }
+        }
+    }
+
+    @Override
+    protected boolean canBeMarkedForCleanUp() {
+        boolean result = super.canBeMarkedForCleanUp();
+        if ( result ) {
+            result = this.jobCount == 0;
+        }
+        return result;
+    }
+
+    @Override
+    protected void notifyFinished(final JobEvent rescheduleInfo) {
+        this.freeSlot();
+    }
+
+    @Override
+    protected JobEvent reschedule(final JobEvent info) {
+        // we just sleep for the delay time - if none, we continue and retry
+        // this job again
+        long delay = this.configuration.getRetryDelayInMs();
+        if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+            delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+        }
+        if ( delay > 0 ) {
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            final String jobName = "Waiting:" + queueName + ":" + info.hashCode();
+            final Runnable t = new Runnable() {
+                public void run() {
+                    put(info);
+                }
+            };
+            try {
+                scheduler.fireJobAt(jobName, t, null, fireDate);
+            } catch (Exception e) {
+                // we ignore the exception and just put back the job in the queue
+                ignoreException(e);
+                t.run();
+            }
+        } else {
+            // put directly into queue
+            put(info);
+        }
+        return null;
+    }
+}
+

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * An ordered job queue is processing the queue FIFO in a serialized
+ * way. If a job fails it is rescheduled and the reschedule is processed
+ * next - this basically means that failing jobs block the queue
+ * until they are finished!
+ */
+public final class OrderedJobQueue extends AbstractJobQueue {
+
+    /** The job event for rescheduling. */
+    private JobEvent jobEvent;
+
+    /** Marker indicating that this queue is currently sleeping. */
+    private volatile long isSleepingUntil = -1;
+
+    /** The sleeping thread. */
+    private volatile Thread sleepingThread;
+
+    /** The queue. */
+    private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+    public OrderedJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env) {
+        super(name, config, env);
+    }
+
+    @Override
+    public String getStateInfo() {
+        return super.getStateInfo() + ", isSleepingUntil=" + this.isSleepingUntil;
+    }
+
+    @Override
+    protected JobEvent start(final JobEvent processInfo) {
+        JobEvent rescheduleInfo = null;
+
+        // if we are ordered we simply wait for the finish
+        if ( this.executeJob(processInfo) ) {
+            rescheduleInfo = this.waitForFinish();
+        }
+        return rescheduleInfo;
+    }
+
+    private void setNotSleeping() {
+        this.isSleepingUntil = -1;
+        this.sleepingThread = null;
+    }
+
+    private void setSleeping(final Thread sleepingThread, final long delay) {
+        this.sleepingThread = sleepingThread;
+        this.isSleepingUntil = System.currentTimeMillis() + delay;
+    }
+
+    @Override
+    public void resume() {
+        if ( this.isSleepingUntil == -1 ) {
+            final Thread thread = this.sleepingThread;
+            if ( thread != null ) {
+                thread.interrupt();
+            }
+        }
+        super.resume();
+    }
+
+    /**
+     * Wait for the job to be finished.
+     * This is called if the queue is ordered.
+     */
+    private JobEvent waitForFinish() {
+        synchronized ( this ) {
+            this.isWaiting = true;
+            this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
+            while ( this.isWaiting ) {
+                try {
+                    this.wait();
+                } catch (InterruptedException e) {
+                    this.ignoreException(e);
+                }
+            }
+            this.logger.debug("Job queue {} is continuing.", this.queueName);
+            final JobEvent object = this.jobEvent;
+            this.jobEvent = null;
+            return object;
+        }
+    }
+
+    @Override
+    protected void put(final JobEvent event) {
+        try {
+            this.queue.put(event);
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+    }
+
+    @Override
+    protected JobEvent take() {
+        try {
+            return this.queue.take();
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+        return null;
+    }
+
+    @Override
+    protected boolean isEmpty() {
+        return this.queue.isEmpty();
+    }
+
+    @Override
+    protected void notifyFinished(final JobEvent rescheduleInfo) {
+        this.jobEvent = rescheduleInfo;
+        this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+        this.isWaiting = false;
+        synchronized ( this ) {
+            this.notify();
+        }
+    }
+
+    @Override
+    protected JobEvent reschedule(final JobEvent info) {
+        // we just sleep for the delay time - if none, we continue and retry
+        // this job again
+        long delay = this.configuration.getRetryDelayInMs();
+        if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+            delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+        }
+        if ( delay > 0 ) {
+            this.setSleeping(Thread.currentThread(), delay);
+            try {
+                this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                this.ignoreException(e);
+            } finally {
+                this.setNotSleeping();
+            }
+        }
+        return info;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        this.queue.clear();
+        super.clear();
+    }
+
+    @Override
+    public synchronized void removeAll() {
+        this.jobEvent = null;
+        super.removeAll();
+    }
+
+    @Override
+    protected Collection<JobEvent> removeAllJobs() {
+        final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+        this.queue.clear();
+        return events;
+    }
+
+    @Override
+    public Object getState(final String key) {
+        if ( "isSleepingUntil".equals(key) ) {
+            return this.isSleepingUntil;
+        }
+        return super.getState(key);
+    }
+}
+

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+
+/**
+ * The default parallel job queue processing the entries FIFO.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class ParallelJobQueue extends AbstractParallelJobQueue {
+
+    /** The queue. */
+    private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+    public ParallelJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env,
+                           final Scheduler scheduler) {
+        super(name, config, env, scheduler);
+    }
+
+    @Override
+    protected void put(final JobEvent event) {
+        try {
+            this.queue.put(event);
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+    }
+
+    @Override
+    protected JobEvent take() {
+        try {
+            return this.queue.take();
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+        return null;
+    }
+
+    @Override
+    protected boolean isEmpty() {
+        return this.queue.isEmpty();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        this.queue.clear();
+        super.clear();
+    }
+
+    @Override
+    protected Collection<JobEvent> removeAllJobs() {
+        final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+        this.queue.clear();
+        return events;
+    }
+}
+

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * This queue acts similar to the parallel job queue. Except that
+ * new jobs are selected based on a round robin topic selection scheme.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class TopicRoundRobinJobQueue extends AbstractParallelJobQueue {
+
+    /** The topic set. */
+    private final List<String> topics = new ArrayList<String>();
+
+    /** The topic map. */
+    private final Map<String, List<JobEvent>> topicMap = new HashMap<String, List<JobEvent>>();
+
+    /** Topic index. */
+    private int topicIndex;
+
+    /** Event count. */
+    private int eventCount;
+
+    private boolean isWaitingForNext = false;
+
+    public TopicRoundRobinJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env,
+                           final Scheduler scheduler) {
+        super(name, config, env, scheduler);
+    }
+
+    @Override
+    public String getStateInfo() {
+        return super.getStateInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
+    }
+
+    @Override
+    protected boolean canBeMarkedForCleanUp() {
+        boolean result = super.canBeMarkedForCleanUp();
+        if ( result ) {
+            result = !this.isWaitingForNext;
+        }
+        return result;
+    }
+
+    @Override
+    protected void put(final JobEvent event) {
+        // is this a close?
+        if ( event.event == null ) {
+            return;
+        }
+        final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+        synchronized ( this.topicMap ) {
+            List<JobEvent> events = this.topicMap.get(topic);
+            if ( events == null ) {
+                events = new LinkedList<JobEvent>();
+                this.topicMap.put(topic, events);
+                this.topics.add(topic);
+            }
+            events.add(event);
+            this.eventCount++;
+            if ( this.isWaitingForNext ) {
+                this.isWaitingForNext = false;
+                // wake up take()
+                this.topicMap.notify();
+            }
+        }
+    }
+
+    @Override
+    protected JobEvent take() {
+        JobEvent e = null;
+        synchronized ( this.topicMap ) {
+            if ( this.eventCount == 0 ) {
+                // wait for a new event
+                this.isWaitingForNext = true;
+                while ( this.isWaitingForNext ) {
+                    try {
+                        this.topicMap.wait();
+                    } catch (final InterruptedException ie) {
+                        this.ignoreException(ie);
+                    }
+                }
+            }
+            if ( this.eventCount > 0 ) {
+                while ( e == null ) {
+                    final String topic = this.topics.get(this.topicIndex);
+                    final List<JobEvent> events = this.topicMap.get(topic);
+                    if ( events.size() > 0 ) {
+                        e = events.remove(0);
+                    }
+                    this.topicIndex++;
+                    if ( this.topicIndex == this.topics.size() ) {
+                        this.topicIndex = 0;
+                    }
+                }
+                this.eventCount--;
+            }
+        }
+        return e;
+    }
+
+    @Override
+    protected boolean isEmpty() {
+        synchronized ( this.topicMap ) {
+            return this.eventCount == 0;
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        synchronized ( this.topicMap ) {
+            this.eventCount = 0;
+            this.topics.clear();
+            this.topicMap.clear();
+        }
+        super.clear();
+    }
+
+    @Override
+    protected Collection<JobEvent> removeAllJobs() {
+        final List<JobEvent> events = new ArrayList<JobEvent>();
+        synchronized ( this.topicMap ) {
+            for(final List<JobEvent> l : this.topicMap.values() ) {
+                events.addAll(l);
+            }
+            this.eventCount = 0;
+            this.topics.clear();
+            this.topicMap.clear();
+        }
+        return events;
+    }
+}
+

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.support;
+
+import org.apache.sling.commons.threads.ThreadPool;
+
+/**
+ * This class provides "global settings"
+ * to all services, like the application id and the thread pool.
+ * @since 3.0
+ */
+public class Environment {
+
+    /** Global application id. */
+    public static String APPLICATION_ID;
+
+    /** Global thread pool. */
+    public static volatile ThreadPool THREAD_POOL;
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Fri Oct 15 14:02:23 2010
@@ -31,6 +31,66 @@ dist.events.description = Distributes lo
  repository are picked up and distributed locally through the OSGi Event Admin \
  Service.   
 
+scheduler.period.name = Cleanup Internal
+scheduler.period.description = Interval in seconds in which events older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 30 minutes (1800 seconds).
+
+cleanup.period.name = Event Cleanup Age
+cleanup.period.description = The maximum age in minutes of persisted events to \
+ be purged from the repository during the cleanup run. The default is 15 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository. 
+
+
+#
+# Queue Configuration and Job Event Handler
+queue.name = Apache Sling Job Queue Configuration
+queue.description = The configuration of a job processing queue.
+
+queue.name.name = Name
+queue.name.description = The name of the queue. If matching is used \
+ the token \{0\} can be used to substitute the real value.
+
+queue.type.name = Type
+queue.type.description = The queue type.
+
+queue.topics.name = Topics
+queue.topics.description = This value is required and lists the topics processed by \
+ this queue. The value is a list of strings. If a string ends with a dot, \
+ all topics in exactly this package match. If the string ends with a star, \
+ all topics in this package and all subpackages match. If the string neither \
+ ends with a dot nor with a star, this is assumed to define an exact topic.
+
+queue.priority.name = Priority
+queue.priority.description = The priority for the threads from this queue. Default is norm.
+
+queue.retries.name = Maximum Retries
+queue.retries.description = The maximum number of times a failed job slated \
+ for retries is actually retried. If a job has been retried this number of \
+ times and still fails, it is not rescheduled and assumed to have failed. The \
+ default value is 10.
+
+queue.retrydelay.name = Retry Delay
+queue.retrydelay.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 2 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+
+queue.maxparallel.name = Maximum Parallel Jobs
+queue.maxparallel.description = The maximum number of parallel jobs started for this queue. \
+ A value of -1 is substituted with the number of available processors.
+
+queue.runlocal.name = Run Local
+queue.runlocal.description = Jobs for this queue are only processed on the cluster node \
+ where the job has been started.
+ 
+queue.applicationids.name = Application Ids
+queue.applicationids.description = An optional list of application ids. If configured, \
+ jobs for this queue are only processed on those cluster nodes.
+
+ 
 #
 # Job Event Handler
 job.events.name = Apache Sling Job Event Handler 
@@ -40,23 +100,27 @@ job.events.description = Manages job sch
  amongst the cluster nodes through repository events. The jobs are started \
  locally on a single cluster node through the OSGi Event Admin.
 
+jobscheduler.period.name = Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which unused \
+ queues are stopped. The default value is 5 minutes (300 seconds).
+
+
+#
+# Persistence Handler
+job.persistence.name = Apache Sling Job Persistence Manager
+job.persistence.description = This service persists and loads jobs from the repository.
+
+persscheduler.period.name = Event Cleanup Internal
+persscheduler.period.description = Interval in seconds in which jobs older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
 sleep.time.name = Retry Interval
 sleep.time.description = The number of milliseconds to sleep between two \
  consecutive retries of a job which failed and was set to be retried. The \
  default value is 30 seconds. This value is only relevant if there is a single \
  failed job in the queue. If there are multiple failed jobs, each job is \
  retried in turn without an intervening delay.
- 
-max.job.retries.name = Maximum Retries
-max.job.retries.description = The maximum number of times a failed job slated \
- for retries is actually retried. If a job has been retried this number of \
- times and still fails, it is not rescheduled and assumed to have failed. The \
- default value is 10.
-
-jobscheduler.period.name = Event Cleanup Internal
-jobscheduler.period.description = Interval in seconds in which jobs older than \
- a specific age (see Event Cleanup Age) are purged from the repository. \
- The default value is 5 minutes (300 seconds).
 
 jobcleanup.period.name = Event Cleanup Age
 jobcleanup.period.description = The maximum age in minutes of persisted job to \
@@ -64,16 +128,6 @@ jobcleanup.period.description = The maxi
  minutes. Note that this setting defines the minimum time an event remains \
  in the repository. 
 
-wait.for.ack.name = Acknowledge Waiting Time
-wait.for.ack.description = If a service is processing a job, it acknowledges this \
- by sending a message to the Job Event Handler. If the Job Event Handler does not \
- receive such a message in the configured time, it reschedules the job. The configured \
- time is in seconds (default is 90 secs).
-
-max.parallel.jobs.name = Maximum Parallel Jobs
-max.parallel.jobs.description = The maximum number of parallel jobs started for the main \
- queue.
-
 max.load.jobs.name = Max Load Jobs
 max.load.jobs.description = The maximum amount of jobs being loaded from the repository on startup. \
  Default is 1000 jobs.
@@ -90,44 +144,21 @@ load.checkdelay.name = Background Check 
 load.checkdelay.description = The background loader sleeps this time of seconds before \
  checking the repository for jobs. Default value is 240 seconds.
 
-max.job.queues.name = Max Job Queues
-max.job.queues.description = The maximum number of job queues (default is 10). \
- If this number is exceeded all jobs for a new job queue are put into the main queue.
-
 #
 # Event Pool
 event.pool.name = Apache Sling Event Thread Pool 
-event.pool.description = This is the thread pool used by the Apache Sling eventing support.
+event.pool.description = This is the thread pool used by the Apache Sling eventing support. The \
+ threads from this pool are merely used for the job handling. By limiting this pool, it is \
+ possible to limit the maximum number of parallel processed jobs - regardless of the queue \
+ configuration.
 
 minPoolSize.name = Min Pool Size
 minPoolSize.description = The minimum pool size. The minimum pool size should be \
- higher than 20. Approx 10 threads are in use by the system, so a pool size of 20 \
- allows to process 10 events in parallel.
+ higher than 10.
 
 maxPoolSize.name = Max Pool Size
 maxPoolSize.description = The maximum pool size. The maximum pool size should be higher than \
  or equal to the minimum pool size.
 
-queueSize.name = Queue Size
-queueSize.description = The maximum size of the thread queue if the pool is exhausted.
-
 priority.name = Priority
 priority.description = The priority for the threads from this pool. Default is norm.
-
-#
-# Shared labels
-scheduler.period.name = Event Cleanup Internal
-scheduler.period.description = Interval in seconds in which events older than \
- a specific age (see Event Cleanup Age) are purged from the repository. \
- The default value is 30 minutes (1800 seconds).
-
-cleanup.period.name = Event Cleanup Age
-cleanup.period.description = The maximum age in minutes of persisted events to \
- be purged from the repository during the cleanup run. The default is 15 \
- minutes. Note that this setting defines the minimum time an event remains \
- in the repository. 
- 
-repository.path.name = Persistent Event Location
-repository.path.description = Absolute Path of the Repository location where \
- events are persisted to be picked up by the event distribution mechanism. \
- The default value is "/sling/events".

Modified: sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd Fri Oct 15 14:02:23 2010
@@ -1,42 +1,42 @@
-//
-//  Licensed to the Apache Software Foundation (ASF) under one
-//  or more contributor license agreements.  See the NOTICE file
-//  distributed with this work for additional information
-//  regarding copyright ownership.  The ASF licenses this file
-//  to you under the Apache License, Version 2.0 (the
-//  "License"); you may not use this file except in compliance
-//  with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-//  Unless required by applicable law or agreed to in writing,
-//  software distributed under the License is distributed on an
-//  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//  KIND, either express or implied.  See the License for the
-//  specific language governing permissions and limitations
-//  under the License.
-//
-
-<slingevent='http://sling.apache.org/jcr/event/1.0'>
-<nt='http://www.jcp.org/jcr/nt/1.0'>
-<mix='http://www.jcp.org/jcr/mix/1.0'>
-
-[slingevent:Event] > nt:unstructured, nt:hierarchyNode
-  - slingevent:topic (string)
-  - slingevent:application (string)
-  - slingevent:created (date)
-  - slingevent:properties (binary)
-  
-[slingevent:Job] > slingevent:Event, mix:lockable
-  - slingevent:processor (string)
-  - slingevent:id (string)
-  - slingevent:finished (date)
- 
-[slingevent:TimedEvent] > slingevent:Event, mix:lockable
-  - slingevent:processor (string)
-  - slingevent:id (string)
-  - slingevent:expression (string)
-  - slingevent:date (date)
-  - slingevent:period (long)
-
-  
+//
+//  Licensed to the Apache Software Foundation (ASF) under one
+//  or more contributor license agreements.  See the NOTICE file
+//  distributed with this work for additional information
+//  regarding copyright ownership.  The ASF licenses this file
+//  to you under the Apache License, Version 2.0 (the
+//  "License"); you may not use this file except in compliance
+//  with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing,
+//  software distributed under the License is distributed on an
+//  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//  KIND, either express or implied.  See the License for the
+//  specific language governing permissions and limitations
+//  under the License.
+//
+
+<slingevent='http://sling.apache.org/jcr/event/1.0'>
+<nt='http://www.jcp.org/jcr/nt/1.0'>
+<mix='http://www.jcp.org/jcr/mix/1.0'>
+
+[slingevent:Event] > nt:unstructured, nt:hierarchyNode
+  - slingevent:topic (string)
+  - slingevent:application (string)
+  - slingevent:created (date)
+  - slingevent:properties (binary)
+  
+[slingevent:Job] > slingevent:Event, mix:lockable
+  - slingevent:processor (string)
+  - slingevent:id (string)
+  - slingevent:finished (date)
+ 
+[slingevent:TimedEvent] > slingevent:Event, mix:lockable
+  - slingevent:processor (string)
+  - slingevent:id (string)
+  - slingevent:expression (string)
+  - slingevent:date (date)
+  - slingevent:period (long)
+
+  

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java Fri Oct 15 14:02:23 2010
@@ -31,7 +31,7 @@ import javax.jcr.PropertyType;
 import javax.jcr.Value;
 import javax.jcr.ValueFactory;
 
-import org.apache.sling.event.impl.EventHelper;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
 import org.jmock.Expectations;
 import org.jmock.Mockery;
 import org.jmock.integration.junit4.JMock;
@@ -100,17 +100,17 @@ public class EventUtilTest {
             will(returnValue(getValueOfType(PropertyType.DATE, "dateValue")));
         }});
         // boolean
-        assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, true).getType());
-        assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, false).getType());
-        assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, Boolean.TRUE).getType());
-        assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, Boolean.FALSE).getType());
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, true).getType());
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, false).getType());
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.TRUE).getType());
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.FALSE).getType());
         // long
-        assertEquals(PropertyType.LONG, EventHelper.getNodePropertyValue(factory, (long)5).getType());
+        assertEquals(PropertyType.LONG, JCRHelper.getNodePropertyValue(factory, (long)5).getType());
         // int = not possible
-        assertEquals(null, EventHelper.getNodePropertyValue(factory, 5));
+        assertEquals(null, JCRHelper.getNodePropertyValue(factory, 5));
         // string
-        assertEquals(PropertyType.STRING, EventHelper.getNodePropertyValue(factory, "something").getType());
+        assertEquals(PropertyType.STRING, JCRHelper.getNodePropertyValue(factory, "something").getType());
         // calendar
-        assertEquals(PropertyType.DATE, EventHelper.getNodePropertyValue(factory, Calendar.getInstance()).getType());
+        assertEquals(PropertyType.DATE, JCRHelper.getNodePropertyValue(factory, Calendar.getInstance()).getType());
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Fri Oct 15 14:02:23 2010
@@ -18,123 +18,27 @@
  */
 package org.apache.sling.event.impl;
 
-import static org.junit.Assert.assertTrue;
-
-import java.net.URL;
-import java.util.Collections;
 import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.Set;
 
-import javax.jcr.Node;
-import javax.jcr.NodeIterator;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-
-import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
-import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
-import org.apache.sling.commons.threads.ThreadPoolConfig;
-import org.apache.sling.jcr.api.SlingRepository;
-import org.apache.sling.settings.SlingSettingsService;
 import org.jmock.Expectations;
-import org.jmock.Mockery;
 import org.jmock.integration.junit4.JMock;
 import org.junit.runner.RunWith;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.ComponentContext;
-import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 
 @RunWith(JMock.class)
-public abstract class AbstractRepositoryEventHandlerTest {
+public abstract class AbstractRepositoryEventHandlerTest extends AbstractTest {
 
     protected volatile AbstractRepositoryEventHandler handler;
 
-    protected static final String REPO_PATH = "/test/events";
-    protected static final String SLING_ID = "4711";
-
-    protected static Session session;
-
-    protected abstract Mockery getMockery();
-
     protected abstract AbstractRepositoryEventHandler createHandler();
 
-    protected Dictionary<String, Object> getComponentConfig() {
-        final Dictionary<String, Object> config = new Hashtable<String, Object>();
-        config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
-
-        return config;
-    }
-
-    @org.junit.BeforeClass public static void setupRepository() throws Exception {
-        RepositoryTestUtil.startRepository();
-        final SlingRepository repository = RepositoryTestUtil.getSlingRepository();
-        session = repository.loginAdministrative(repository.getDefaultWorkspace());
-        assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
-        assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
-        if ( session.itemExists(REPO_PATH) ) {
-            session.getItem(REPO_PATH).remove();
-            session.save();
-        }
-    }
-
-    @org.junit.AfterClass public static void shutdownRepository() throws Exception {
-        if ( session != null ) {
-            session.logout();
-            session = null;
-        }
-        RepositoryTestUtil.stopRepository();
-    }
-
-    @org.junit.Before public void setup() throws Exception {
-        // activate
-        this.activate(null);
-    }
-
-    int activateCount = 1;
-
-    protected void activate(final EventAdmin ea) {
+    protected void activate(final EventAdmin ea) throws Throwable {
+        super.activate(ea);
         this.handler = this.createHandler();
-        this.handler.repository = RepositoryTestUtil.getSlingRepository();
-        this.handler.classLoaderManager = new DynamicClassLoaderManager() {
-
-            public ClassLoader getDynamicClassLoader() {
-                return this.getClass().getClassLoader();
-            }
-        };
-        // the event admin
-        if ( ea != null ) {
-            this.handler.eventAdmin = ea;
-        } else {
-            final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class, "eventAdmin" + activateCount);
-            this.handler.eventAdmin = eventAdmin;
-            this.getMockery().checking(new Expectations() {{
-                allowing(eventAdmin).postEvent(with(any(Event.class)));
-                allowing(eventAdmin).sendEvent(with(any(Event.class)));
-            }});
-        }
-
-        // sling settings service
-        this.handler.settingsService = new SlingSettingsService() {
-            public String getSlingId() {
-                return SLING_ID;
-            }
-
-            public URL getSlingHome() {
-                return null;
-            }
-
-            public String getSlingHomePath() {
-                return null;
-            }
-
-            public Set<String> getRunModes() {
-                return Collections.<String> emptySet();
-            }
-        };
 
-        // we need a thread pool
-        this.handler.threadPool = new ThreadPoolImpl();
+        handler.environment = this.environment;
 
         // lets set up the bundle context
         final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext" + activateCount);
@@ -161,7 +65,7 @@ public abstract class AbstractRepository
         }
     }
 
-    protected void deactivate() {
+    protected void deactivate() throws Throwable {
         // lets set up the bundle context with the sling id
         final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext" + activateCount);
 
@@ -172,44 +76,6 @@ public abstract class AbstractRepository
         }});
         this.handler.deactivate(componentContext);
         this.handler = null;
-        activateCount++;
-    }
-
-    @org.junit.After public void shutdown() throws Exception {
-        final String path = this.handler.repositoryPath;
-        this.deactivate();
-        try {
-            // delete all child nodes to get a clean repository again
-            final Node rootNode = (Node) session.getItem(path);
-            final NodeIterator iter = rootNode.getNodes();
-            while ( iter.hasNext() ) {
-                final Node child = iter.nextNode();
-                child.remove();
-            }
-            session.save();
-        } catch ( RepositoryException re) {
-            // we ignore this for the test
-        }
-    }
-
-    @org.junit.Test public void testPathCreation() throws RepositoryException {
-        assertTrue(session.itemExists(REPO_PATH));
-    }
-
-    final class ThreadPoolImpl implements ThreadPool {
-
-        public void execute(Runnable runnable) {
-            final Thread t = new Thread(runnable);
-            t.start();
-        }
-
-        public String getName() {
-            return "default";
-        }
-
-        public ThreadPoolConfig getConfiguration() {
-            return new ModifiableThreadPoolConfig();
-        }
-
+        super.deactivate();
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java Fri Oct 15 14:02:23 2010
@@ -33,6 +33,8 @@ import javax.jcr.observation.EventListen
 
 import org.apache.jackrabbit.util.ISO9075;
 import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.apache.sling.event.impl.support.Environment;
 import org.jmock.Mockery;
 import org.jmock.integration.junit4.JMock;
 import org.jmock.integration.junit4.JUnit4Mockery;
@@ -59,7 +61,7 @@ public class DistributingEventHandlerTes
     }
 
     @org.junit.Test public void testSetup() throws RepositoryException {
-        assertEquals(this.handler.applicationId, SLING_ID);
+        assertEquals(Environment.APPLICATION_ID, SLING_ID);
         assertEquals(this.handler.repositoryPath, REPO_PATH);
         assertNotNull(this.handler.writerSession);
         final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
@@ -82,9 +84,9 @@ public class DistributingEventHandlerTes
         final NodeIterator iter = rootNode.getNodes();
         iter.hasNext();
         final Node eventNode = iter.nextNode();
-        assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
-        assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
-        assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+        assertEquals(topic, eventNode.getProperty(JCRHelper.NODE_PROPERTY_TOPIC).getString());
+        assertEquals(Environment.APPLICATION_ID, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+        assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
         // as a starting point we just check if the properties property exists
         assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
     }
@@ -100,9 +102,9 @@ public class DistributingEventHandlerTes
         final NodeIterator iter = rootNode.getNodes();
         iter.hasNext();
         final Node eventNode = iter.nextNode();
-        assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
-        assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
-        assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+        assertEquals(topic, eventNode.getProperty(JCRHelper.NODE_PROPERTY_TOPIC).getString());
+        assertEquals(Environment.APPLICATION_ID, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+        assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
         // as a starting point we just check if the properties property exists
         assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
     }

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java Fri Oct 15 14:02:23 2010
@@ -52,8 +52,14 @@ public class SimpleEventAdmin implements
     public void sendEvent(Event event) {
         if ( topics != null ) {
             for(int i=0; i<topics.length; i++) {
-                if ( topics[i].equals(event.getTopic()) ) {
-                    handler[i].handleEvent(event);
+                if ( topics[i].endsWith("*") ) {
+                    if ( event.getTopic().startsWith(topics[i].substring(0, topics[i].length() - 1)) ) {
+                        handler[i].handleEvent(event);
+                    }
+                } else {
+                    if ( topics[i].equals(event.getTopic()) ) {
+                        handler[i].handleEvent(event);
+                    }
                 }
             }
         }

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+
+public class InternalQueueConfigurationTest {
+
+    private JobEvent getJobEvent(final String topic) {
+        final Dictionary<String, Object> dict = new Hashtable<String, Object>();
+        dict.put(JobUtil.PROPERTY_JOB_TOPIC, topic);
+        return new JobEvent(new Event(topic, dict), topic) {
+            public void unlock() {
+                // dummy
+            }
+            public boolean reschedule() {
+                return false;
+            }
+            public boolean remove() {
+                return false;
+            }
+            public boolean lock() {
+                return false;
+            }
+            public void finished() {
+                // dummy
+            }
+        };
+    }
+    @org.junit.Test public void testMaxParallel() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -1);
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+    }
+
+    @org.junit.Test public void testTopicMatchersDot() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a."});
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        assertTrue(c.match(getJobEvent("a/b")));
+        assertTrue(c.match(getJobEvent("a/c")));
+        assertFalse(c.match(getJobEvent("a")));
+        assertFalse(c.match(getJobEvent("a/b/c")));
+        assertFalse(c.match(getJobEvent("t")));
+        assertFalse(c.match(getJobEvent("t/x")));
+    }
+
+    @org.junit.Test public void testTopicMatchersStar() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a*"});
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        assertTrue(c.match(getJobEvent("a/b")));
+        assertTrue(c.match(getJobEvent("a/c")));
+        assertFalse(c.match(getJobEvent("a")));
+        assertTrue(c.match(getJobEvent("a/b/c")));
+        assertFalse(c.match(getJobEvent("t")));
+        assertFalse(c.match(getJobEvent("t/x")));
+    }
+
+    @org.junit.Test public void testTopicMatchers() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a"});
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        assertFalse(c.match(getJobEvent("a/b")));
+        assertFalse(c.match(getJobEvent("a/c")));
+        assertTrue(c.match(getJobEvent("a")));
+        assertFalse(c.match(getJobEvent("a/b/c")));
+        assertFalse(c.match(getJobEvent("t")));
+        assertFalse(c.match(getJobEvent("t/x")));
+    }
+
+    @org.junit.Test public void testTopicMatcherAndReplacement() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a."});
+        p.put(ConfigurationConstants.PROP_NAME, "test-queue-{0}");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        final JobEvent b = getJobEvent("a/b");
+        assertTrue(c.match(b));
+        assertEquals("test-queue-b", b.queueName);
+        final JobEvent d = getJobEvent("a/d");
+        assertTrue(c.match(d));
+        assertEquals("test-queue-d", d.queueName);
+    }
+
+    @org.junit.Test public void testTopicMatchersDotAndSlash() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        assertTrue(c.match(getJobEvent("a/b")));
+        assertTrue(c.match(getJobEvent("a/c")));
+        assertFalse(c.match(getJobEvent("a")));
+        assertFalse(c.match(getJobEvent("a/b/c")));
+        assertFalse(c.match(getJobEvent("t")));
+        assertFalse(c.match(getJobEvent("t/x")));
+    }
+
+    @org.junit.Test public void testTopicMatchersStarAndSlash() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/*"});
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        assertTrue(c.match(getJobEvent("a/b")));
+        assertTrue(c.match(getJobEvent("a/c")));
+        assertFalse(c.match(getJobEvent("a")));
+        assertTrue(c.match(getJobEvent("a/b/c")));
+        assertFalse(c.match(getJobEvent("t")));
+        assertFalse(c.match(getJobEvent("t/x")));
+    }
+
+    @org.junit.Test public void testTopicMatcherAndReplacementAndSlash() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+        p.put(ConfigurationConstants.PROP_NAME, "test-queue-{0}");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        final JobEvent b = getJobEvent("a/b");
+        assertTrue(c.match(b));
+        assertEquals("test-queue-b", b.queueName);
+        final JobEvent d = getJobEvent("a/d");
+        assertTrue(c.match(d));
+        assertEquals("test-queue-d", d.queueName);
+    }
+
+    @org.junit.Test public void testNoTopicMatchers() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertFalse(c.isValid());
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url