You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/11 08:54:14 UTC

svn commit: r1021247 [4/6] - in /sling/branches/eventing-3.0: ./ .settings/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/...

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.JobStatusProvider;
+import org.apache.sling.event.JobsIterator;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobManager.QueryType;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.osgi.service.event.Event;
+
+
+/**
+ * An service to query jobs.
+ *
+ * @deprecated
+ */
+@Deprecated
+@Component
+@Service(value=JobStatusProvider.class)
+public class JobStatusProviderImpl
+    implements JobStatusProvider {
+
+    @Reference
+    private JobManager jobManager;
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#removeJob(java.lang.String, java.lang.String)
+     */
+    public boolean removeJob(final String topic, final String jobId) {
+        if ( jobId != null && topic != null ) {
+            final Event job = this.jobManager.findJob(topic,
+                    Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
+            if ( job != null ) {
+                return this.removeJob((String)job.getProperty(JobUtil.JOB_ID));
+            }
+        }
+        return true;
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#removeJob(java.lang.String)
+     */
+    public boolean removeJob(String jobId) {
+        return this.jobManager.removeJob(jobId);
+    }
+
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#forceRemoveJob(java.lang.String, java.lang.String)
+     */
+    public void forceRemoveJob(final String topic, final String jobId) {
+        if ( jobId != null && topic != null ) {
+            final Event job = this.jobManager.findJob(topic,
+                    Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
+            if ( job != null ) {
+                this.forceRemoveJob((String)job.getProperty(JobUtil.JOB_ID));
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#forceRemoveJob(java.lang.String)
+     */
+    public void forceRemoveJob(final String jobId) {
+        this.jobManager.forceRemoveJob(jobId);
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#wakeUpJobQueue(java.lang.String)
+     */
+    public void wakeUpJobQueue(final String jobQueueName) {
+        final Queue q = this.jobManager.getQueue(jobQueueName);
+        if ( q != null ) {
+            q.resume();
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#queryAllJobs(String, Map...)
+     */
+    public JobsIterator queryAllJobs(final String topic, final Map<String, Object>... filterProps) {
+        return new JobsIteratorImpl(this.jobManager.queryJobs(QueryType.ALL, topic, filterProps));
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#queryCurrentJobs(String, Map...)
+     */
+    public JobsIterator queryCurrentJobs(final String topic, final Map<String, Object>... filterProps) {
+        return new JobsIteratorImpl(this.jobManager.queryJobs(QueryType.ACTIVE, topic, filterProps));
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#queryScheduledJobs(String, Map...)
+     */
+    public JobsIterator queryScheduledJobs(final String topic, final Map<String, Object>... filterProps) {
+        return new JobsIteratorImpl(this.jobManager.queryJobs(QueryType.QUEUED, topic, filterProps));
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String)
+     */
+    @Deprecated
+    public Collection<Event> getCurrentJobs(String topic) {
+        return this.getCurrentJobs(topic, (Map<String, Object>[])null);
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String)
+     */
+    @Deprecated
+    public Collection<Event> getScheduledJobs(String topic) {
+        return this.getScheduledJobs(topic, (Map<String, Object>[])null);
+    }
+
+    private Collection<Event> asList(final JobsIterator ji)  {
+        final List<Event> jobs = new ArrayList<Event>();
+        while ( ji.hasNext() ) {
+            jobs.add(ji.next());
+        }
+        return jobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String, java.util.Map...)
+     */
+    @Deprecated
+    public Collection<Event> getCurrentJobs(String topic, Map<String, Object>... filterProps) {
+        return this.asList(this.queryCurrentJobs(topic, filterProps));
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String, java.util.Map...)
+     */
+    @Deprecated
+    public Collection<Event> getScheduledJobs(String topic, Map<String, Object>... filterProps) {
+        return this.asList(this.queryScheduledJobs(topic, filterProps));
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#getAllJobs(java.lang.String, java.util.Map...)
+     */
+    @Deprecated
+    public Collection<Event> getAllJobs(String topic, Map<String, Object>... filterProps) {
+        return this.asList(this.queryAllJobs(topic, filterProps));
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String, java.lang.String)
+     */
+    @Deprecated
+    public void cancelJob(String topic, String jobId) {
+        this.removeJob(topic, jobId);
+    }
+
+    /**
+     * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String)
+     */
+    @Deprecated
+    public void cancelJob(String jobId) {
+        this.removeJob(jobId);
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.Iterator;
+
+import org.apache.sling.event.JobsIterator;
+import org.osgi.service.event.Event;
+
+/**
+ * JCR Based Implementation of the jobs iterator
+ * @deprecated
+ */
+@Deprecated
+public class JobsIteratorImpl implements JobsIterator {
+
+    private final org.apache.sling.event.jobs.JobsIterator delegatee;
+
+    public JobsIteratorImpl(final org.apache.sling.event.jobs.JobsIterator i) {
+        this.delegatee = i;
+    }
+
+    /**
+     * @see org.apache.sling.event.JobsIterator#close()
+     * @deprecated
+     */
+    @Deprecated
+    public void close() {
+        // nothing to do
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobsIterator#getPosition()
+     */
+    public long getPosition() {
+        return this.delegatee.getPosition();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobsIterator#getSize()
+     */
+    public long getSize() {
+        return this.delegatee.getSize();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobsIterator#skip(long)
+     */
+    public void skip(long skipNum) {
+        this.delegatee.skip(skipNum);
+    }
+
+    /**
+     * @see java.util.Iterator#hasNext()
+     */
+    public boolean hasNext() {
+        return this.delegatee.hasNext();
+    }
+
+    /**
+     * @see java.util.Iterator#next()
+     */
+    public Event next() {
+        return this.delegatee.next();
+    }
+
+    /**
+     * @see java.util.Iterator#remove()
+     */
+    public void remove() {
+        this.delegatee.remove();
+    }
+
+    /**
+     * @see java.lang.Iterable#iterator()
+     */
+    public Iterator<Event> iterator() {
+        return this.delegatee.iterator();
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,1074 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jcr.ItemExistsException;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.qom.Constraint;
+import javax.jcr.query.qom.Ordering;
+import javax.jcr.query.qom.QueryObjectModelFactory;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.DefaultJobManager;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Persistence handler for the jobs
+ *
+ */
+@Component(label="%job.persistence.name",
+        description="%job.persistence.description",
+        metatype=true,immediate=true)
+@Services({
+    @Service(value=PersistenceHandler.class),
+    @Service(value=EventHandler.class),
+    @Service(value=Runnable.class)
+})
+@Properties({
+    @Property(name="event.topics",propertyPrivate=true,
+            value={"org/osgi/framework/BundleEvent/UPDATED",
+                   "org/osgi/framework/BundleEvent/STARTED",
+                   JobUtil.TOPIC_JOB}),
+     @Property(name="scheduler.period", longValue=300,label="%persscheduler.period.name",description="%persscheduler.period.description"),
+     @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
+})
+public class PersistenceHandler implements EventListener, Runnable, EventHandler {
+
+    /** Default repository path. */
+    private static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
+
+    /** The path where all jobs are stored. */
+    @Property(value=DEFAULT_REPOSITORY_PATH, propertyPrivate=true)
+    private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+    /** Default clean up time is 5 minutes. */
+    private static final int DEFAULT_CLEANUP_PERIOD = 5;
+
+    @Property(intValue=DEFAULT_CLEANUP_PERIOD,label="%jobcleanup.period.name",description="%jobcleanup.period.description")
+    private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+    /** Default maximum load jobs. */
+    private static final long DEFAULT_MAXIMUM_LOAD_JOBS = 1000;
+
+    /** Number of jobs to load from the repository on startup in one go. */
+    @Property(longValue=DEFAULT_MAXIMUM_LOAD_JOBS)
+    private static final String CONFIG_PROPERTY_MAX_LOAD_JOBS = "max.load.jobs";
+
+    /** Default load threshold. */
+    private static final long DEFAULT_LOAD_THRESHOLD = 400;
+
+    /** Threshold - if the queue is lower than this threshold the repository is checked for events. */
+    @Property(longValue=DEFAULT_LOAD_THRESHOLD)
+    private static final String CONFIG_PROPERTY_LOAD_THREASHOLD = "load.threshold";
+
+    /** Default background load delay. */
+    private static final long DEFAULT_BACKGROUND_LOAD_DELAY = 30;
+
+    /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
+    @Property(longValue=DEFAULT_BACKGROUND_LOAD_DELAY)
+    private static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
+
+    /** Default background check delay. */
+    private static final long DEFAULT_BACKGROUND_CHECK_DELAY = 240;
+
+    /** The background loader waits this time of seconds between loads from the repository. (in secs) */
+    @Property(longValue=DEFAULT_BACKGROUND_CHECK_DELAY)
+    private static final String CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY = "load.checkdelay";
+
+    /** We remove everything which is older than 5 min by default. */
+    private int cleanupPeriod;
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** The repository path. */
+    protected String repositoryPath;
+
+    /** Is the background task still running? */
+    private volatile boolean running;
+
+    /** Unloaded jobs. */
+    private Set<String>unloadedJobs = new HashSet<String>();
+
+    /** A local queue for writing received events into the repository. */
+    private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+    /** Lock for the background session. */
+    private final Object backgroundLock = new Object();
+
+    /** Background session for all reading, locking etc. */
+    private Session backgroundSession;
+
+    @Reference
+    private EnvironmentComponent environment;
+
+    @Reference
+    private JobManager jobManager;
+
+    /**
+     * Activate this component.
+     * @param context The component context.
+     */
+    @Activate
+    protected void activate(final ComponentContext context) throws RepositoryException {
+        @SuppressWarnings("unchecked")
+        final Dictionary<String, Object> props = context.getProperties();
+        this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+        this.repositoryPath = OsgiUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH);
+        this.running = true;
+
+        // start writer background thread
+        final Thread writerThread = new Thread(new Runnable() {
+            public void run() {
+                persistJobs();
+            }
+        }, "Apache Sling Job Writer");
+        writerThread.setDaemon(true);
+        writerThread.start();
+
+        // start background thread which loads jobs from the repository
+        final long loadThreshold = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_LOAD_THREASHOLD), DEFAULT_LOAD_THRESHOLD);
+        final long backgroundLoadDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+        final long backgroundCheckDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY), DEFAULT_BACKGROUND_CHECK_DELAY);
+        final long maxLoadJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAX_LOAD_JOBS), DEFAULT_MAXIMUM_LOAD_JOBS);
+        final Thread loaderThread = new Thread(new Runnable() {
+            public void run() {
+                loadJobsInTheBackground(backgroundLoadDelay, backgroundCheckDelay, loadThreshold, maxLoadJobs);
+            }
+        }, "Apache Sling Job Background Loader");
+        loaderThread.setDaemon(true);
+        loaderThread.start();
+
+        // open background session for observation
+        // create the background session and register a listener
+        this.backgroundSession = this.environment.createAdminSession();
+        this.backgroundSession.getWorkspace().getObservationManager()
+             .addEventListener(this,
+                          javax.jcr.observation.Event.PROPERTY_REMOVED
+                          |javax.jcr.observation.Event.PROPERTY_ADDED
+                          |javax.jcr.observation.Event.NODE_REMOVED
+                          |javax.jcr.observation.Event.NODE_ADDED,
+                          this.repositoryPath,
+                          true,
+                          null,
+                          null,
+                          true);
+    }
+
+    /**
+     * Deactivate this component.
+     * @param context The component context.
+     */
+    @Deactivate
+    protected void deactivate(final ComponentContext context) {
+        this.running = false;
+        // stop write queue
+        try {
+            this.writeQueue.put(new Event("some", (Dictionary<String, Object>)null));
+        } catch (InterruptedException e) {
+            this.ignoreException(e);
+        }
+        if ( this.backgroundSession != null ) {
+            synchronized ( this.backgroundLock ) {
+                this.logger.debug("Shutting down background session.");
+                try {
+                    this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+                } catch (RepositoryException e) {
+                    // we just ignore it
+                    this.logger.warn("Unable to remove event listener.", e);
+                }
+                this.backgroundSession.logout();
+                this.backgroundSession = null;
+            }
+        }
+        logger.debug("Apache Sling Job Persistence Handler stopped on instance {}", Environment.APPLICATION_ID);
+    }
+
+    @Modified
+    protected void update(final ComponentContext context) {
+        // we don't need to do anything as the config values are only used for initial loading!
+    }
+
+    /**
+     * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+     */
+    public void onEvent(EventIterator iter) {
+        // we create an own session here - this is done lazy
+        Session s = null;
+        try {
+            while ( iter.hasNext() ) {
+                final javax.jcr.observation.Event event = iter.nextEvent();
+
+                try {
+                    final String path = event.getPath();
+                    String loadNodePath = null;
+
+                    if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED) {
+                        loadNodePath = path;
+                    } else if ( event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+                        final int pos = path.lastIndexOf('/');
+                        final String propertyName = path.substring(pos+1);
+
+                        // we are only interested in unlocks
+                        if ( "jcr:lockOwner".equals(propertyName) ) {
+                            loadNodePath = path.substring(0, pos);
+                        }
+                    } else if ( event.getType() == javax.jcr.observation.Event.PROPERTY_ADDED ) {
+                        final int pos = path.lastIndexOf('/');
+                        final String propertyName = path.substring(pos+1);
+
+                        // we are only interested in locks
+                        if ( "jcr:lockOwner".equals(propertyName) ) {
+                            ((DefaultJobManager)this.jobManager).notifyActiveJob(path.substring(this.repositoryPath.length() + 1));
+                        }
+
+                    } else if ( event.getType() == javax.jcr.observation.Event.NODE_REMOVED) {
+                        synchronized (unloadedJobs) {
+                            this.unloadedJobs.remove(path);
+                        }
+                        ((DefaultJobManager)this.jobManager).notifyRemoveJob(path.substring(this.repositoryPath.length() + 1));
+                    }
+                    if ( loadNodePath != null ) {
+                        if ( s == null ) {
+                            s = this.environment.createAdminSession();
+                        }
+                        // we do a sanity check if the node exists first
+                        if ( s.itemExists(loadNodePath) ) {
+                            final Node eventNode = (Node) s.getItem(loadNodePath);
+                            if ( eventNode.isNodeType(JCRHelper.JOB_NODE_TYPE) ) {
+                                if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED ) {
+                                    logger.debug("New job has been added. Trying to load from {}", loadNodePath);
+                                } else {
+                                    logger.debug("Job execution failed by someone else. Trying to load from {}", loadNodePath);
+                                }
+                                tryToLoadJob(eventNode, this.unloadedJobs);
+                            }
+                        }
+                    }
+                } catch (RepositoryException re) {
+                    this.logger.error("Exception during jcr event processing.", re);
+                }
+
+            }
+        } finally {
+            if ( s != null ) {
+                s.logout();
+            }
+        }
+    }
+
+    /**
+     * Return the query for the clean up.
+     */
+    private Query getCleanUpQuery(final Session s)
+    throws RepositoryException {
+        final String selectorName = "nodetype";
+        final Calendar deleteBefore = Calendar.getInstance();
+        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+
+        final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory();
+
+        final Query q = qomf.createQuery(
+                qomf.selector(JCRHelper.JOB_NODE_TYPE, selectorName),
+                qomf.and(qomf.descendantNode(selectorName, this.repositoryPath),
+                         qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_FINISHED),
+                                       QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+                                       qomf.literal(s.getValueFactory().createValue(deleteBefore)))),
+                null,
+                null
+        );
+        return q;
+    }
+
+    /**
+     * This method is invoked periodically.
+     * @see java.lang.Runnable#run()
+     */
+    public void cleanup() {
+        // remove obsolete jobs from the repository
+        if ( this.running && this.cleanupPeriod > 0 ) {
+            this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);
+
+            // we create an own session for concurrency issues
+            Session s = null;
+            try {
+                s = this.environment.createAdminSession();
+                final Query q = this.getCleanUpQuery(s);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Executing query {}", q.getStatement());
+                }
+                final NodeIterator iter = q.execute().getNodes();
+                int count = 0;
+                while ( iter.hasNext() ) {
+                    final Node eventNode = iter.nextNode();
+                    eventNode.remove();
+                    count++;
+                }
+                s.save();
+                logger.debug("Removed {} entries from the repository.", count);
+
+            } catch (RepositoryException e) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during repository cleanup.", e);
+            } finally {
+                if ( s != null ) {
+                    s.logout();
+                }
+            }
+        }
+    }
+
+    /**
+     * Load all active jobs from the repository.
+     */
+    private void loadJobsInTheBackground(final long backgroundLoadDelay,
+            final long backgroundCheckDelay,
+            final long loadThreshold,
+            final long maxLoadJobs) {
+        final long startTime = System.currentTimeMillis();
+        // give the system some time to start
+        try {
+            Thread.sleep(1000 * backgroundLoadDelay); // default is 30 seconds
+        } catch (InterruptedException e) {
+            this.ignoreException(e);
+        }
+        // are we still running?
+        if ( this.running ) {
+            logger.debug("Starting background loading.");
+            long loadSince = -1;
+            do {
+                loadSince = this.loadJobs(loadSince, startTime, maxLoadJobs);
+                if ( this.running && loadSince > -1 ) {
+                    do {
+                        try {
+                            Thread.sleep(1000 * backgroundCheckDelay); // default is 240 seconds
+                        } catch (InterruptedException e) {
+                            this.ignoreException(e);
+                        }
+                    } while ( this.running && this.jobManager.getStatistics().getNumberOfJobs() > loadThreshold );
+                }
+            } while (this.running && loadSince > -1);
+            logger.debug("Finished background loading.");
+        }
+    }
+
+    /**
+     * Load a batch of active jobs from the repository.
+     */
+    private long loadJobs(final long since, final long startTime, final long maxLoadJobs) {
+        long eventCreated = since;
+        final long maxLoad = (since == -1 ? maxLoadJobs : maxLoadJobs - this.jobManager.getStatistics().getNumberOfJobs());
+        // sanity check
+        if ( maxLoad > 0 ) {
+            logger.debug("Loading from repository since {} and max {}", since, maxLoad);
+            Session session = null;
+            try {
+                session = this.environment.createAdminSession();
+                final QueryManager qManager = session.getWorkspace().getQueryManager();
+                final ValueFactory vf = session.getValueFactory();
+                final String selectorName = "nodetype";
+                final Calendar startDate = Calendar.getInstance();
+                startDate.setTimeInMillis(startTime);
+
+                final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+                Constraint constraint = qomf.and(
+                        qomf.descendantNode(selectorName, this.repositoryPath),
+                        qomf.not(qomf.propertyExistence(selectorName, JCRHelper.NODE_PROPERTY_FINISHED)));
+                constraint = qomf.and(constraint,
+                        qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED),
+                                QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+                                qomf.literal(vf.createValue(startDate))));
+                if ( since != -1 ) {
+                    final Calendar beforeDate = Calendar.getInstance();
+                    beforeDate.setTimeInMillis(since);
+                    constraint = qomf.and(constraint,
+                            qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED),
+                                    QueryObjectModelFactory.JCR_OPERATOR_GREATER_THAN,
+                                    qomf.literal(vf.createValue(beforeDate))));
+                }
+                final Query q = qomf.createQuery(
+                        qomf.selector(JCRHelper.JOB_NODE_TYPE, selectorName),
+                        constraint,
+                        new Ordering[] {qomf.ascending(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED))},
+                        null
+                );
+                final NodeIterator result = q.execute().getNodes();
+                long count = 0;
+                while ( result.hasNext() && count < maxLoad ) {
+                    final Node eventNode = result.nextNode();
+                    final String propPath = eventNode.getPath() + '/' + JCRHelper.NODE_PROPERTY_CREATED;
+                    if ( session.itemExists(propPath) ) {
+                        eventCreated = eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getLong();
+                        if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+                            count++;
+                        }
+                    }
+                }
+                // now we have to add all jobs with the same created time!
+                boolean done = false;
+                while ( result.hasNext() && !done ) {
+                    final Node eventNode = result.nextNode();
+                    final String propPath = eventNode.getPath() + '/' + JCRHelper.NODE_PROPERTY_CREATED;
+                    if ( session.itemExists(propPath) ) {
+                        final long created = eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getLong();
+                        if ( created == eventCreated ) {
+                            if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+                                count++;
+                            }
+                        } else {
+                            done = true;
+                        }
+                    }
+                }
+                // have we processed all jobs?
+                if ( !done && !result.hasNext() ) {
+                    eventCreated = -1;
+                }
+                logger.debug("Loaded {} jobs and new since {}", count, eventCreated);
+            } catch (RepositoryException re) {
+                this.logger.error("Exception during initial loading of stored jobs.", re);
+            } finally {
+                if ( session != null ) {
+                    session.logout();
+                }
+            }
+        }
+        return eventCreated;
+    }
+
+    /**
+     * Try to load a job from an event node in the repository.
+     * @param eventNode       The node to read the event from
+     * @param unloadedJobSet  The set of unloaded jobs - if loading fails, the node path is added here
+     * @return <code>true</code> If the job can be loaded.
+     */
+    private boolean tryToLoadJob(final Node eventNode, final Set<String> unloadedJobSet) {
+        try {
+            final String nodePath = eventNode.getPath();
+            // first check: job should not be finished
+            if ( !eventNode.hasProperty(JCRHelper.NODE_PROPERTY_FINISHED)) {
+                boolean shouldProcess = true;
+                // second check: is this a job that should only run on the instance that it was created on?
+                if ( eventNode.hasProperty(JobUtil.PROPERTY_JOB_RUN_LOCAL) &&
+                     !eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString().equals(Environment.APPLICATION_ID)) {
+                    shouldProcess = false;
+                    if ( logger.isDebugEnabled() ) {
+                         logger.debug("Discarding job at {} : local job for a different application node.", nodePath);
+                    }
+                }
+                Event event = null;
+                try {
+                    event = this.readEvent(eventNode, false);
+                    if ( shouldProcess ) {
+                         this.process(event);
+                    }
+
+                } catch (ClassNotFoundException cnfe) {
+                    // store path for lazy loading
+                    synchronized ( unloadedJobSet ) {
+                        unloadedJobSet.add(nodePath);
+                    }
+                    this.ignoreException(cnfe);
+                } catch (RepositoryException re) {
+                    this.logger.error("Unable to load stored job from " + nodePath, re);
+                }
+
+                if ( event == null ) {
+                    try {
+                        event = this.readEvent(eventNode, true);
+                    } catch (ClassNotFoundException cnfe) {
+                        // this can't occur
+                    } catch (RepositoryException re) {
+                        this.logger.error("Unable to load stored job from " + nodePath, re);
+                    }
+                }
+                if ( event != null ) {
+                    ((DefaultJobManager)this.jobManager).notifyAddJob(new JCRJobEvent(event, this));
+                }
+                return shouldProcess && event != null;
+
+            }
+            // if the node is finished, this is usually an unlock event
+            ((DefaultJobManager)this.jobManager).notifyRemoveJob(nodePath.substring(this.repositoryPath.length() + 1));
+        } catch (RepositoryException re) {
+            this.logger.error("Unable to load stored job from " + eventNode, re);
+        }
+        return false;
+    }
+
+    /**
+     * Background thread for writing jobs to the repository
+     */
+    private void persistJobs() {
+        logger.debug("Apache Sling Job Persistence Handler started on instance {}", Environment.APPLICATION_ID);
+        Session writerSession = null;
+        Node rootNode = null;
+
+        try {
+            writerSession = this.environment.createAdminSession();
+            rootNode = this.createPath(writerSession.getRootNode(),
+                    this.repositoryPath.substring(1),
+                    JCRHelper.NODETYPE_ORDERED_FOLDER);
+            writerSession.save();
+
+            try {
+                this.processWriteQueue(rootNode);
+             } catch (Throwable t) { //NOSONAR
+                 logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+                 running = false;
+             }
+        } catch (RepositoryException e) {
+            // there is nothing we can do except log!
+            logger.error("Error during session starting.", e);
+            running = false;
+        } finally {
+            if ( writerSession != null ) {
+                writerSession.logout();
+            }
+        }
+    }
+
+    /**
+     * The writer queue. One job is written on each run.
+     */
+    private void processWriteQueue(final Node rootNode) {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( event != null && this.running ) {
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Persisting job {}", EventUtil.toString(event));
+                }
+                final String jobId = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
+                final String jobTopic = (String)event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+                final String nodePath = Utility.getUniquePath(jobTopic, jobId);
+
+                // if the job has no job id, we can just write the job to the repo and don't
+                // need locking
+                if ( jobId == null ) {
+                    try {
+                        this.writeEvent(rootNode, event, nodePath);
+                    } catch (RepositoryException re ) {
+                        // something went wrong, so let's log it
+                        this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
+                    }
+                } else {
+                    try {
+                        // let's first search for an existing node with the same id
+                        Node foundNode = null;
+                        if ( rootNode.hasNode(nodePath) ) {
+                            foundNode = rootNode.getNode(nodePath);
+                        }
+                        if ( foundNode == null ) {
+                            // We now write the event into the repository
+                            try {
+                                this.writeEvent(rootNode, event, nodePath);
+                            } catch (ItemExistsException iee) {
+                                // someone else did already write this node in the meantime
+                                // nothing to do for us
+                            }
+                        }
+                    } catch (RepositoryException re ) {
+                        // something went wrong, so let's log it
+                        this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Return the repository path.
+     */
+    public String getRepositoryPath() {
+        return this.repositoryPath;
+    }
+
+    /**
+     * Write an event to the repository.
+     * @param rootNode The root node for all jobs
+     * @param e The event
+     * @param suggestedName A suggested name/path for the node.
+     * @throws RepositoryException
+     */
+    private void writeEvent(final Node rootNode, final Event e, final String path)
+    throws RepositoryException {
+        // create new node with name of topic
+        final Node eventNode = this.createPath(rootNode,
+                path,
+                JCRHelper.JOB_NODE_TYPE);
+        JCRHelper.writeEventProperties(eventNode, e);
+
+        eventNode.setProperty(JCRHelper.NODE_PROPERTY_CREATED, Calendar.getInstance());
+        eventNode.setProperty(JCRHelper.NODE_PROPERTY_APPLICATION, Environment.APPLICATION_ID);
+
+        // job topic
+        eventNode.setProperty(JCRHelper.NODE_PROPERTY_TOPIC, (String)e.getProperty(JobUtil.PROPERTY_JOB_TOPIC));
+        // job id
+        final String jobId = (String)e.getProperty(JobUtil.PROPERTY_JOB_NAME);
+        if ( jobId != null ) {
+            eventNode.setProperty(JCRHelper.NODE_PROPERTY_JOBID, jobId);
+        }
+        rootNode.getSession().save();
+    }
+
+    /**
+     * Read an event from the repository.
+     * This method is similar as {@link #readEvent(Node, boolean)} with the exception
+     * that it even loads the event if classes are missing
+     * @throws RepositoryException
+     */
+    public Event forceReadEvent(Node eventNode)
+    throws RepositoryException {
+        try {
+            return this.readEvent(eventNode, false);
+        } catch (ClassNotFoundException cnfe) {
+            this.ignoreException(cnfe);
+        }
+        // we try it again and set the force load flag
+        try {
+            return this.readEvent(eventNode, true);
+        } catch (ClassNotFoundException cnfe) {
+            // this can never happen but we catch it anyway and rethrow
+            this.ignoreException(cnfe);
+            throw new RepositoryException(cnfe);
+        }
+    }
+
+    /**
+     * Read an event from the repository.
+     * @throws RepositoryException
+     * @throws ClassNotFoundException
+     */
+    private Event readEvent(Node eventNode, final boolean forceLoad)
+    throws RepositoryException, ClassNotFoundException {
+        final String topic = eventNode.getProperty(JCRHelper.NODE_PROPERTY_TOPIC).getString();
+        final ClassLoader cl = this.environment.getDynamicClassLoader();
+        final Dictionary<String, Object> eventProps = JCRHelper.readEventProperties(eventNode, cl, forceLoad);
+
+        eventProps.put(JobUtil.JOB_ID, eventNode.getPath().substring(this.repositoryPath.length() + 1));
+        // convert to integers (jcr only supports long)
+        if ( eventProps.get(JobUtil.PROPERTY_JOB_RETRIES) != null ) {
+            eventProps.put(JobUtil.PROPERTY_JOB_RETRIES, Integer.valueOf(eventProps.get(JobUtil.PROPERTY_JOB_RETRIES).toString()));
+        }
+        if ( eventProps.get(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+            eventProps.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, Integer.valueOf(eventProps.get(JobUtil.PROPERTY_JOB_RETRY_COUNT).toString()));
+        }
+        // add application id
+        eventProps.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+        // and created
+        eventProps.put(JCRHelper.NODE_PROPERTY_CREATED, eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getDate());
+        try {
+            final Event event = new Event(topic, eventProps);
+            return event;
+        } catch (IllegalArgumentException iae) {
+            // this exception occurs if the topic is not correct (it should never happen,
+            // but you never know)
+            throw new RepositoryException("Unable to read event: " + iae.getMessage(), iae);
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Check the job topic of an event
+     */
+    private boolean checkJobTopic(final Event job) {
+        final String jobTopic = (String)job.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+        boolean topicIsCorrect = false;
+        if ( jobTopic != null ) {
+            try {
+                @SuppressWarnings("unused")
+                final Event testEvent = new Event(jobTopic, (Dictionary<String, Object>)null);
+                topicIsCorrect = true;
+            } catch (IllegalArgumentException iae) {
+                // we just have to catch it
+            }
+            if ( !topicIsCorrect ) {
+                logger.warn("Discarding job {} : job has an illegal job topic {}", EventUtil.toString(job), jobTopic);
+            }
+        } else {
+            logger.warn("Discarding job {} : job topic is missing", EventUtil.toString(job));
+        }
+        return topicIsCorrect;
+    }
+
+    /**
+     * Store an event in the repository by putting it in the write queue.
+     */
+    private void store(final Event event) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Handling local job {}", EventUtil.toString(event));
+        }
+        // check job topic
+        if ( this.checkJobTopic(event) ) {
+            try {
+                this.writeQueue.put(event);
+            } catch (InterruptedException e) {
+                // this should never happen
+                this.ignoreException(e);
+            }
+        }
+    }
+
+    /**
+     * Try to reload unloaded jobs - this method is invoked if bundles have been added etc.
+     */
+    private void tryToReloadUnloadedJobs() {
+        // bundle event started or updated
+        boolean doIt = false;
+        synchronized ( this.unloadedJobs ) {
+            if ( this.unloadedJobs.size() > 0 ) {
+                doIt = true;
+            }
+        }
+        if ( doIt ) {
+            final Runnable t = new Runnable() {
+
+                public void run() {
+                    synchronized (unloadedJobs) {
+                        Session s = null;
+                        final Set<String> newUnloadedJobs = new HashSet<String>();
+                        newUnloadedJobs.addAll(unloadedJobs);
+                        try {
+                            s = environment.createAdminSession();
+                            for(String path : unloadedJobs ) {
+                                newUnloadedJobs.remove(path);
+                                try {
+                                    if ( s.itemExists(path) ) {
+                                        final Node eventNode = (Node) s.getItem(path);
+                                        tryToLoadJob(eventNode, newUnloadedJobs);
+                                    }
+                                } catch (RepositoryException re) {
+                                    // we ignore this and readd
+                                    newUnloadedJobs.add(path);
+                                    ignoreException(re);
+                                }
+                            }
+                        } catch (RepositoryException re) {
+                            // unable to create session, so we try it again next time
+                            ignoreException(re);
+                        } finally {
+                            if ( s != null ) {
+                                s.logout();
+                            }
+                            unloadedJobs.clear();
+                            unloadedJobs.addAll(newUnloadedJobs);
+                        }
+                    }
+                }
+
+            };
+            Environment.THREAD_POOL.execute(t);
+        }
+    }
+
+    /**
+     * Process the event and pass it on to the queue manager.
+     * Check topic and local flag first!
+     */
+    private void process(final Event event) {
+        if ( !checkJobTopic(event) ) {
+            return;
+        }
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Received new job {}", EventUtil.toString(event));
+        }
+        // check for local only jobs and remove them from the queue if they're meant
+        // for another application node
+        final String appId = (String)event.getProperty(EventUtil.PROPERTY_APPLICATION);
+        if ( event.getProperty(JobUtil.PROPERTY_JOB_RUN_LOCAL) != null
+            && appId != null && !Environment.APPLICATION_ID.equals(appId) ) {
+            if ( logger.isDebugEnabled() ) {
+                 logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(event));
+            }
+        } else {
+            final JobEvent info = new JCRJobEvent(event, this);
+            ((DefaultJobManager)this.jobManager).process(info);
+        }
+    }
+
+    /**
+     * Try to lock the node in the repository.
+     * Locking might fail if:
+     * - the node has been removed
+     * - the job has alreay been processed
+     * - someone else locked it already
+     *
+     * @param info The job event
+     * @return <code>true</code> if the node could be locked
+     */
+    public boolean lock(final JobEvent info) {
+        final String path = this.getNodePath(info.uniqueId);
+        synchronized ( this.backgroundLock ) {
+            try {
+                // check if the node still exists
+                if ( this.backgroundSession.itemExists(path)
+                     && !this.backgroundSession.itemExists(path + '/' + JCRHelper.NODE_PROPERTY_FINISHED)) {
+
+                    final Node eventNode = (Node) this.backgroundSession.getItem(path);
+                    if ( !eventNode.isLocked() ) {
+                        // lock node
+                        try {
+                            this.backgroundSession.getWorkspace().getLockManager().lock(path, false, true, Long.MAX_VALUE, "JobEventHandler:" + Environment.APPLICATION_ID);
+                        } catch (RepositoryException re) {
+                            // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                            return false;
+                        }
+                        return true;
+                    }
+                }
+            } catch (RepositoryException re) {
+                this.ignoreException(re);
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Unlock the node for the event
+     */
+    public void unlock(final JobEvent info) {
+        final String path = this.getNodePath(info.uniqueId);
+        synchronized ( this.backgroundLock ) {
+            try {
+                this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+            } catch (RepositoryException re) {
+                // there is nothing we can do
+                this.ignoreException(re);
+            }
+        }
+    }
+
+    /**
+     * Finish the job
+     */
+    public void finished(final JobEvent info) {
+        final String jobId = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_NAME);
+        final String path = this.getNodePath(info.uniqueId);
+        synchronized ( this.backgroundLock ) {
+            try {
+                if ( this.backgroundSession.itemExists(path) ) {
+                    final Node eventNode = (Node)this.backgroundSession.getItem(path);
+                    if ( jobId == null ) {
+                        // simply remove the node
+                        eventNode.remove();
+                    } else {
+                        eventNode.setProperty(JCRHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
+                        eventNode.setProperty(JCRHelper.NODE_PROPERTY_PROCESSOR, Environment.APPLICATION_ID);
+                    }
+                    this.backgroundSession.save();
+                    // and unlock
+                    if ( jobId != null ) {
+                        this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+                    }
+                }
+            } catch (RepositoryException re) {
+                // there is nothing we can do
+                this.ignoreException(re);
+            }
+        }
+    }
+
+    private String getNodePath(final String jobId) {
+        return this.repositoryPath + '/' + jobId;
+    }
+
+    /**
+     * Remove the job - if not currently in processing.
+     */
+    public boolean remove(final String jobId) {
+        if ( this.backgroundSession != null && jobId != null ) {
+            final String path = this.getNodePath(jobId);
+            synchronized ( this.backgroundLock ) {
+                try {
+                    if ( this.backgroundSession.itemExists(path) ) {
+                        final Node eventNode = (Node) this.backgroundSession.getItem(path);
+                        if ( eventNode.isLocked() ) {
+                            this.logger.debug("Attempted to cancel a running job at {}", path);
+                            return false;
+                        }
+                        // try to load job to send notification
+                        try {
+                            final Event job = this.forceReadEvent(eventNode);
+                            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, job);
+                        } catch (RepositoryException ignore) {
+                            this.ignoreException(ignore);
+                        }
+                        eventNode.remove();
+                        this.backgroundSession.save();
+                    }
+                } catch (RepositoryException e) {
+                    this.logger.error("Error during cancelling job at " + path, e);
+                }
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Reschedule the job
+     */
+    public boolean reschedule(final JobEvent info) {
+        final String path = this.getNodePath(info.uniqueId);
+        synchronized ( this.backgroundLock ) {
+            try {
+                if ( this.backgroundSession.itemExists(path) ) {
+                    final Node eventNode = (Node)this.backgroundSession.getItem(path);
+                    eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRIES, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES));
+                    eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT));
+                    eventNode.setProperty(JCRHelper.NODE_PROPERTY_PROCESSOR, Environment.APPLICATION_ID);
+                    this.backgroundSession.save();
+
+                    // and unlock
+                    this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+                    return true;
+                }
+            } catch (RepositoryException re) {
+                // there is nothing we can do
+                this.ignoreException(re);
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Creates or gets the {@link javax.jcr.Node Node} at the given Path.
+     * In case it has to create the Node all non-existent intermediate path-elements
+     * will be create with the given intermediate node type and the returned node
+     * will be created with the given nodeType
+     *
+     * @param parentNode starting node
+     * @param relativePath to create
+     * @param intermediateNodeType to use for creation of intermediate nodes (or null)
+     * @param nodeType to use for creation of the final node (or null)
+     * @param autoSave Should save be called when a new node is created?
+     * @return the Node at path
+     * @throws RepositoryException in case of exception accessing the Repository
+     */
+    private Node createPath(Node   parentNode,
+                            String relativePath,
+                            String nodeType)
+    throws RepositoryException {
+        if (!parentNode.hasNode(relativePath)) {
+            Node node = parentNode;
+            int pos = relativePath.lastIndexOf('/');
+            if ( pos != -1 ) {
+                final StringTokenizer st = new StringTokenizer(relativePath.substring(0, pos), "/");
+                while ( st.hasMoreTokens() ) {
+                    final String token = st.nextToken();
+                    if ( !node.hasNode(token) ) {
+                        try {
+                            node.addNode(token, JCRHelper.NODETYPE_FOLDER);
+                            node.getSession().save();
+                        } catch (RepositoryException re) {
+                            // we ignore this as this folder might be created from a different task
+                            node.refresh(false);
+                        }
+                    }
+                    node = node.getNode(token);
+                }
+                relativePath = relativePath.substring(pos + 1);
+            }
+            if ( !node.hasNode(relativePath) ) {
+                node.addNode(relativePath, nodeType);
+            }
+            return node.getNode(relativePath);
+        }
+        return parentNode.getNode(relativePath);
+    }
+
+    /**
+     * This method is invoked periodically by the scheduler.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        this.cleanup();
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    public void handleEvent(final Event event) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Receiving event {}", EventUtil.toString(event));
+        }
+        // we ignore remote job events
+        if ( EventUtil.isLocal(event) ) {
+            // check for bundle event
+            if ( event.getTopic().equals(JobUtil.TOPIC_JOB)) {
+                this.store(event);
+            } else {
+                // bundle event started or updated
+                this.tryToReloadUnloadedJobs();
+            }
+        }
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.StatisticsImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.Statistics;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public abstract class AbstractJobQueue
+    extends StatisticsImpl
+    implements JobStatusNotifier, Queue {
+
+    /** Default number of seconds to wait for an ack. */
+    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
+    /** Default timeout for suspend. */
+    private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
+
+    /** The logger. */
+    protected final Logger logger;
+
+    /** Configuration. */
+    protected final InternalQueueConfiguration configuration;
+
+    /** The environment component. */
+    private final EnvironmentComponent environment;
+
+    /** The queue name. */
+    protected volatile String queueName;
+
+    /** Are we still running? */
+    protected volatile boolean running;
+
+    /** Are we marked for cleanup */
+    private volatile boolean markedForCleanUp = false;
+
+    /** Is the queue currently waiting(sleeping) */
+    protected volatile boolean isWaiting = false;
+
+    /** The map of events we're have started (send). */
+    private final Map<String, JobEvent> startedJobsLists = new HashMap<String, JobEvent>();
+
+    /** The map of events we're processing. */
+    private final Map<String, JobEvent> processsingJobsLists = new HashMap<String, JobEvent>();
+
+    /** Suspended since. */
+    private final AtomicLong suspendedSince = new AtomicLong(-1);
+
+    /** Suspend lock. */
+    private final Object suspendLock = new Object();
+
+    /**
+     * Start this queue
+     * @param name The queue name
+     * @param config The queue configuration
+     * @param environment The environment component
+     */
+    public AbstractJobQueue(final String name,
+                    final InternalQueueConfiguration config,
+                    final EnvironmentComponent environment) {
+        this.queueName = name;
+        this.configuration = config;
+        this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+        this.running = true;
+        this.environment = environment;
+    }
+
+    public String getStatusInfo() {
+        return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue();
+    }
+
+    /**
+     * Start the job queue.
+     */
+    public void start() {
+        final Thread queueThread = new Thread(new Runnable() {
+
+            public void run() {
+                while ( running ) {
+                    logger.info("Starting job queue {}", queueName);
+                    logger.debug("Configuration for job queue={}", configuration);
+
+                    try {
+                        runJobQueue();
+                    } catch (Throwable t) { //NOSONAR
+                        logger.error("Job queue " + queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
+                    }
+                }
+            }
+
+        }, "Apache Sling Job Queue " + queueName);
+        queueThread.setDaemon(true);
+        queueThread.start();
+    }
+
+    /**
+     * Return the queue configuration
+     */
+    public InternalQueueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * Close this queue.
+     */
+    public void close() {
+        this.running = false;
+        this.logger.debug("Shutting down job queue {}", queueName);
+        this.logger.debug("Waking up sleeping queue {}", queueName);
+        this.resume();
+        if ( this.isWaiting ) {
+            this.logger.debug("Waking up waiting queue {}", this.queueName);
+            this.notifyFinished(null);
+        }
+        // continue queue processing to stop the queue
+        this.put(new JobEvent(null, null) {
+            public boolean lock() { return false; }
+            public void unlock() {
+                // dummy impl
+            }
+            public void finished() {
+                // dummy impl
+            }
+            public boolean remove() { return true; }
+            public boolean reschedule() { return false; }
+            });
+
+        this.processsingJobsLists.clear();
+        this.startedJobsLists.clear();
+        this.logger.info("Stopped job queue {}", this.queueName);
+    }
+
+    /**
+     * Periodically cleanup.
+     */
+    public void cleanup() {
+        if ( this.running ) {
+            // check for jobs that were started but never got an aknowledge
+            final long tooOld = System.currentTimeMillis() - DEFAULT_WAIT_FOR_ACK_IN_MS;
+            // to keep the synchronized block as fast as possible we just store the
+            // jobs to be removed in a new list and process this list afterwards
+            final List<JobEvent> restartJobs = new ArrayList<JobEvent>();
+            synchronized ( this.startedJobsLists ) {
+                final Iterator<Map.Entry<String, JobEvent>> i = this.startedJobsLists.entrySet().iterator();
+                while ( i.hasNext() ) {
+                    final Map.Entry<String, JobEvent> entry = i.next();
+                    if ( entry.getValue().started <= tooOld ) {
+                        restartJobs.add(entry.getValue());
+                    }
+                }
+            }
+
+            // restart jobs is now a list of potential candidates, we now have to check
+            // each candidate separately again!
+            if ( restartJobs.size() > 0 ) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    // we just ignore this
+                    this.ignoreException(e);
+                }
+            }
+            final Iterator<JobEvent> jobIter = restartJobs.iterator();
+            while ( jobIter.hasNext() ) {
+                final JobEvent info = jobIter.next();
+                boolean process = false;
+                synchronized ( this.startedJobsLists ) {
+                    process = this.startedJobsLists.remove(info.uniqueId) != null;
+                }
+                if ( process ) {
+                    this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId);
+                    this.finishedJob(info.event, true);
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
+     */
+    public boolean sendAcknowledge(final Event job) {
+        final String location = (String)job.getProperty(JobUtil.JOB_ID);
+        final JobEvent ack;
+        synchronized ( this.startedJobsLists ) {
+            ack = this.startedJobsLists.remove(location);
+        }
+        // if the event is still in the processing list, we confirm the ack
+        if ( ack != null ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Received ack for job {}", EventUtil.toString(job));
+            }
+            this.addActive(ack.started - ack.queued);
+            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_STARTED, job);
+            synchronized ( this.processsingJobsLists ) {
+                this.processsingJobsLists.put(location, ack);
+            }
+        } else {
+            this.decQueued();
+        }
+        return ack != null;
+    }
+
+    private boolean handleReschedule(final JobEvent jobEvent, final boolean shouldReschedule) {
+        boolean reschedule = shouldReschedule;
+        if ( shouldReschedule ) {
+            // check if we exceeded the number of retries
+            int retries = this.configuration.getMaxRetries();
+            if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES) != null ) {
+                retries = (Integer) jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES);
+            }
+            int retryCount = 0;
+            if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                retryCount = (Integer)jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT);
+            }
+            retryCount++;
+            if ( retries != -1 && retryCount > retries ) {
+                reschedule = false;
+            }
+            if ( reschedule ) {
+                // update event with retry count and retries
+                final Dictionary<String, Object> newProperties = new EventPropertiesMap(jobEvent.event);
+                newProperties.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+                newProperties.put(JobUtil.PROPERTY_JOB_RETRIES, retries);
+                jobEvent.event = new Event(jobEvent.event.getTopic(), newProperties);
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Failed job {}", EventUtil.toString(jobEvent.event));
+                }
+                this.failedJob();
+                jobEvent.queued = System.currentTimeMillis();
+                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FAILED, jobEvent.event);
+            } else {
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Cancelled job {}", EventUtil.toString(jobEvent.event));
+                }
+                this.cancelledJob();
+                Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.event);
+            }
+        } else {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("Finished job {}", EventUtil.toString(jobEvent.event));
+            }
+            this.finishedJob(System.currentTimeMillis() - jobEvent.started);
+            Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FINISHED, jobEvent.event);
+        }
+
+        return reschedule;
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, boolean)
+     */
+    public boolean finishedJob(final Event job, final boolean shouldReschedule) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
+        }
+        if ( !this.running ) {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("Queue is not running anymore. Discarding finish for {}", EventUtil.toString(job));
+            }
+            return false;
+        }
+        final String location = (String)job.getProperty(JobUtil.JOB_ID);
+        // let's remove the event from our processing list
+        // this is just a sanity check, as usually the job should have been
+        // removed during sendAcknowledge.
+        synchronized ( this.startedJobsLists ) {
+            this.startedJobsLists.remove(location);
+        }
+
+        // get job event
+        final JobEvent info;
+        synchronized ( this.processsingJobsLists ) {
+            info = this.processsingJobsLists.remove(location);
+        }
+        if ( info == null ) {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("This job has never been started by this queue: {}", EventUtil.toString(job));
+            }
+            return false;
+        }
+
+        // handle the reschedule, a new job might be returned with updated reschedule info!
+        final boolean reschedule = this.handleReschedule(info, shouldReschedule);
+
+        // if this is set after the synchronized block we have an error
+        final boolean finishSuccessful;
+
+        if ( !reschedule ) {
+            info.finished();
+            finishSuccessful = true;
+        } else {
+            finishSuccessful = info.reschedule();
+        }
+
+        if ( !finishSuccessful || !reschedule ) {
+            checkForNotify(null);
+            return false;
+        }
+        checkForNotify(info);
+        return true;
+    }
+
+    private void checkForNotify(final JobEvent info) {
+        JobEvent reprocessInfo = null;
+        if ( info != null ) {
+            reprocessInfo = this.reschedule(info);
+        }
+        notifyFinished(reprocessInfo);
+    }
+
+    protected boolean canBeMarkedForCleanUp() {
+        return this.isEmpty() && !this.isWaiting;
+    }
+    /**
+     * Mark this queue for cleanup.
+     */
+    public void markForCleanUp() {
+        if ( this.canBeMarkedForCleanUp() ) {
+            this.markedForCleanUp = true;
+        }
+    }
+
+    /**
+     * Check if this queue is marked for cleanup
+     */
+    public boolean isMarkedForCleanUp() {
+        return this.markedForCleanUp && this.canBeMarkedForCleanUp();
+    }
+
+    /**
+     * Get the name of the job queue.
+     */
+    public String getName() {
+        return this.queueName;
+    }
+
+
+    /**
+     * Add a new job to the queue.
+     */
+    public void process(final JobEvent event) {
+        this.put(event);
+        event.queued = System.currentTimeMillis();
+        this.incQueued();
+    }
+
+    /**
+     * Execute the qeue
+     */
+    private void runJobQueue() {
+        JobEvent info = null;
+        while ( this.running ) {
+            while ( this.suspendedSince.longValue() != -1 ) {
+                synchronized ( this.suspendLock ) {
+                    try {
+                        this.suspendLock.wait(MAX_SUSPEND_TIME);
+                    } catch (final InterruptedException ignore) {
+                        this.ignoreException(ignore);
+                    }
+                    if ( System.currentTimeMillis() > this.suspendedSince.longValue() + MAX_SUSPEND_TIME ) {
+                        this.suspendedSince.set(-1);
+                    }
+                }
+            }
+            if ( info == null ) {
+                // so let's wait/get the next job from the queue
+                info = this.take();
+            }
+
+            if ( info != null && this.running ) {
+                info = this.start(info);
+            }
+        }
+    }
+
+    /**
+     * Process a job
+     */
+    protected boolean executeJob(final JobEvent info) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Executing job {}.", EventUtil.toString(info.event));
+        }
+        if ( info.lock() ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Starting job {}", EventUtil.toString(info.event));
+            }
+            boolean unlock = true;
+            try {
+                final Event jobEvent = this.getJobEvent(info);
+                final EventAdmin localEA = this.environment.getEventAdmin();
+                info.started = System.currentTimeMillis();
+                // let's add the event to our processing list
+                synchronized ( this.startedJobsLists ) {
+                    this.startedJobsLists.put(info.uniqueId, info);
+                }
+
+                // we need async delivery, otherwise we might create a deadlock
+                // as this method runs inside a synchronized block and the finishedJob
+                // method as well!
+                localEA.postEvent(jobEvent);
+                // do not unlock if sending was successful
+                unlock = false;
+
+                return true;
+
+            } catch (final Exception re) {
+                // if an exception occurs, we just log
+                this.logger.error("Exception during job processing.", re);
+            } finally {
+                if ( unlock ) {
+                    info.unlock();
+                }
+            }
+        }
+
+        this.decQueued();
+        return false;
+    }
+
+    /**
+     * Create the real job event.
+     * This generates a new event object with the same properties, but with the
+     * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+     * @param info The job event.
+     * @return The real job event.
+     */
+    private Event getJobEvent(final JobEvent info) {
+        final String eventTopic = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+        final Dictionary<String, Object> properties = new EventPropertiesMap(info.event);
+        // put properties for finished job callback
+        properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME,
+                new JobStatusNotifier.NotifierContext(this));
+        // remove app id and distributable flag
+        properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
+        properties.remove(EventUtil.PROPERTY_APPLICATION);
+
+        // set priority from configuration
+        if ( properties.get(JobUtil.PROPERTY_JOB_PRIORITY) == null ) {
+            properties.put(JobUtil.PROPERTY_JOB_PRIORITY, this.configuration.getPriority());
+        }
+        // set queue name
+        properties.put(JobUtil.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+
+        return new Event(eventTopic, properties);
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    protected void ignoreException(Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Rename this queue.
+     */
+    public void rename(final String name) {
+        this.logger.info("Queue reconfiguration: old queue {} is renamed to {}.", this.queueName, name);
+        this.queueName = name;
+    }
+
+    protected abstract JobEvent start(final JobEvent event);
+
+    protected abstract void put(final JobEvent event);
+
+    protected abstract JobEvent take();
+
+    protected abstract boolean isEmpty();
+
+    protected abstract void notifyFinished(final JobEvent rescheduleInfo);
+
+    /**
+     * Reschedule a job.
+     */
+    protected abstract JobEvent reschedule(final JobEvent info);
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStatistics()
+     */
+    public Statistics getStatistics() {
+        return this;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#resume()
+     */
+    public void resume() {
+        if ( this.isSuspended() ) {
+            synchronized ( this.suspendLock ) {
+                this.suspendLock.notify();
+            }
+        }
+        this.suspendedSince.set(-1);
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#suspend()
+     */
+    public void suspend() {
+        this.suspendedSince.compareAndSet(-1, System.currentTimeMillis());
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#isSuspended()
+     */
+    public boolean isSuspended() {
+        return this.suspendedSince.longValue() != -1;
+    }
+
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#removeAll()
+     */
+    public void removeAll() {
+        final boolean wasSuspended = this.isSuspended();
+        this.suspend();
+        while ( !this.isEmpty() ) {
+            final JobEvent event = this.take();
+            if ( event != null ) {
+                event.remove();
+            }
+        }
+        this.clearQueued();
+        if ( !wasSuspended ) {
+            this.resume();
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        this.clearQueued();
+    }
+}
+

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.Date;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * Abstract base class for a parallel processing job queue.
+ */
+public abstract class AbstractParallelJobQueue extends AbstractJobQueue {
+
+    protected volatile int jobCount;
+
+    /** The scheduler for rescheduling. */
+    private final Scheduler scheduler;
+
+    public AbstractParallelJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env,
+                           final Scheduler scheduler) {
+        super(name, config, env);
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public String getStatusInfo() {
+        return super.getStatusInfo() + ", jobCount=" + this.jobCount;
+    }
+
+    @Override
+    protected JobEvent start(final JobEvent processInfo) {
+        // acquire a slot
+        this.acquireSlot();
+
+        if ( !this.executeJob(processInfo) ) {
+            this.freeSlot();
+        }
+        return null;
+    }
+
+    /**
+     * Acquire a processing slot.
+     * This method is called if the queue is not ordered.
+     */
+    private void acquireSlot() {
+        synchronized ( this ) {
+            if ( jobCount >= this.configuration.getMaxParallel() ) {
+                this.isWaiting = true;
+                this.logger.debug("Job queue {} is processing {} jobs - waiting for a free slot.", this.queueName, jobCount);
+                while ( this.isWaiting ) {
+                    try {
+                        this.wait();
+                    } catch (final InterruptedException e) {
+                        this.ignoreException(e);
+                    }
+                }
+                this.logger.debug("Job queue {} is continuing.", this.queueName);
+            }
+            jobCount++;
+        }
+    }
+
+    /**
+     * Free a slot when a job processing is finished.
+     */
+    private void freeSlot() {
+        synchronized ( this ) {
+            jobCount--;
+            if ( this.isWaiting ) {
+                this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+                this.isWaiting = false;
+                this.notify();
+            }
+        }
+    }
+
+    @Override
+    protected boolean canBeMarkedForCleanUp() {
+        boolean result = super.canBeMarkedForCleanUp();
+        if ( result ) {
+            result = this.jobCount == 0;
+        }
+        return result;
+    }
+
+    @Override
+    protected void notifyFinished(final JobEvent rescheduleInfo) {
+        this.freeSlot();
+    }
+
+    @Override
+    protected JobEvent reschedule(final JobEvent info) {
+        // we just sleep for the delay time - if none, we continue and retry
+        // this job again
+        long delay = this.configuration.getRetryDelayInMs();
+        if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+            delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+        }
+        if ( delay > 0 ) {
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            final String jobName = "Waiting:" + queueName + ":" + info.hashCode();
+            final Runnable t = new Runnable() {
+                public void run() {
+                    put(info);
+                }
+            };
+            try {
+                scheduler.fireJobAt(jobName, t, null, fireDate);
+            } catch (Exception e) {
+                // we ignore the exception and just put back the job in the queue
+                ignoreException(e);
+                t.run();
+            }
+        } else {
+            // put directly into queue
+            put(info);
+        }
+        return null;
+    }
+}
+

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain