You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/11 08:54:14 UTC

svn commit: r1021247 [2/6] - in /sling/branches/eventing-3.0: ./ .settings/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/...

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.qom.QueryObjectModelFactory;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * This event handler distributes events across an application cluster.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component(label="%dist.events.name",
+           description="%dist.events.description",
+           immediate=true,
+           metatype=true,
+           policy=ConfigurationPolicy.REQUIRE)
+@Service(value=Runnable.class)
+@Properties({
+    @Property(name="event.topics",value="*",propertyPrivate=true),
+    @Property(name="event.filter",value="(event.distribute=*)",propertyPrivate=true),
+    @Property(name="repository.path",value="/var/eventing/distribution",propertyPrivate=true),
+    @Property(name="scheduler.period", longValue=1800),
+    @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
+})
+public class DistributingEventHandler
+    extends AbstractRepositoryEventHandler
+    implements Runnable {
+
+    /** Default clean up time is 15 minutes. */
+    private static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+    @Property(intValue=DEFAULT_CLEANUP_PERIOD)
+    private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+    /** We remove everything which is older than 15min by default. */
+    private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#activate(org.osgi.service.component.ComponentContext)
+     */
+    protected void activate(ComponentContext context) {
+        @SuppressWarnings("unchecked")
+        final Dictionary<String, Object> props = context.getProperties();
+        this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+        super.activate(context);
+    }
+
+    /**
+     * Return the query for the clean up.
+     */
+    protected Query getCleanUpQuery(final Session s)
+    throws RepositoryException {
+        final String selectorName = "nodetype";
+        final Calendar deleteBefore = Calendar.getInstance();
+        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+
+        final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory();
+
+        final Query q = qomf.createQuery(
+                qomf.selector(getEventNodeType(), selectorName),
+                qomf.and(qomf.descendantNode(selectorName, this.repositoryPath),
+                         qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED),
+                                       QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+                                       qomf.literal(s.getValueFactory().createValue(deleteBefore)))),
+                null,
+                null
+        );
+        return q;
+    }
+
+    /**
+     * This method is invoked periodically.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        if ( this.cleanupPeriod > 0 ) {
+            this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+            // we create an own session for concurrency issues
+            Session s = null;
+            try {
+                s = this.environment.createAdminSession();
+                final Query q = this.getCleanUpQuery(s);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Executing query {}", q.getStatement());
+                }
+                final NodeIterator iter = q.execute().getNodes();
+                int count = 0;
+                while ( iter.hasNext() ) {
+                    final Node eventNode = iter.nextNode();
+                    eventNode.remove();
+                    count++;
+                }
+                s.save();
+                logger.debug("Removed {} entries from the repository.", count);
+
+            } catch (RepositoryException e) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during repository cleanup.", e);
+            } finally {
+                if ( s != null ) {
+                    s.logout();
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+     */
+    protected void processWriteQueue() {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( event != null && this.running ) {
+                try {
+                    this.writeEvent(event, null);
+                } catch (Exception e) {
+                    this.logger.error("Exception during writing the event to the repository.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+     */
+    protected void runInBackground() {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            EventInfo info = null;
+            try {
+                info = this.queue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( info != null && this.running ) {
+                if ( info.nodePath != null) {
+                    Session session = null;
+                    try {
+                        session = this.environment.createAdminSession();
+                        final Node eventNode = (Node)session.getItem(info.nodePath);
+                        if ( eventNode.isNodeType(this.getEventNodeType()) ) {
+                            final EventAdmin localEA = this.environment.getEventAdmin();
+                            if ( localEA != null ) {
+                                localEA.postEvent(this.readEvent(eventNode));
+                            } else {
+                                this.logger.error("Unable to post event as no event admin is available.");
+                            }
+                        }
+                    } catch (Exception ex) {
+                        this.logger.error("Exception during reading the event from the repository.", ex);
+                    } finally {
+                        if ( session != null ) {
+                            session.logout();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    public void handleEvent(final Event event) {
+        try {
+            this.writeQueue.put(event);
+        } catch (InterruptedException ex) {
+            // we ignore this
+            this.ignoreException(ex);
+        }
+    }
+
+    /**
+     * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+     */
+    public void onEvent(final EventIterator iterator) {
+        while ( iterator.hasNext() ) {
+            final javax.jcr.observation.Event event = iterator.nextEvent();
+            try {
+                final EventInfo info = new EventInfo();
+                info.nodePath = event.getPath();
+                this.queue.put(info);
+            } catch (InterruptedException ex) {
+                // we ignore this
+                this.ignoreException(ex);
+            } catch (RepositoryException ex) {
+                this.logger.error("Exception during reading the event from the repository.", ex);
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary)
+     */
+    protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
+    throws RepositoryException {
+        super.addEventProperties(eventNode, properties);
+        properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+    }
+
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+     */
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
+        this.writerSession.getWorkspace().getObservationManager()
+            .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * Environment component. This component provides "global settings"
+ * to all services, like the application id and the thread pool.
+ * @since 3.0
+ */
+@Component()
+@Service(value=EnvironmentComponent.class)
+public class EnvironmentComponent {
+
+    @Reference
+    private SlingRepository repository;
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    @Reference(policy=ReferencePolicy.DYNAMIC,cardinality=ReferenceCardinality.OPTIONAL_UNARY)
+    private DynamicClassLoaderManager classLoaderManager;
+
+    /**
+     * Our thread pool.
+     */
+    @Reference(referenceInterface=EventingThreadPool.class)
+    private ThreadPool threadPool;
+
+    /** Sling settings service. */
+    @Reference
+    private SlingSettingsService settingsService;
+
+    /**
+     * Activate this component.
+     */
+    @Activate
+    protected void activate() {
+        // Set the application id and the thread pool
+        Environment.APPLICATION_ID = this.settingsService.getSlingId();
+        Environment.THREAD_POOL = this.threadPool;
+    }
+
+    /**
+     * Dectivate this component.
+     */
+    @Deactivate
+    protected void deactivate() {
+        // Unset the thread pool
+        if ( Environment.THREAD_POOL == this.threadPool ) {
+            Environment.THREAD_POOL = null;
+        }
+    }
+
+    /**
+     * Return the dynamic classloader for loading events from the repository.
+     */
+    public ClassLoader getDynamicClassLoader() {
+        final DynamicClassLoaderManager dclm = this.classLoaderManager;
+        if ( dclm != null ) {
+            return dclm.getDynamicClassLoader();
+        }
+        // if we don't have a dynamic classloader, we return our classloader
+        return this.getClass().getClassLoader();
+    }
+
+    /**
+     * Create a new admin session.
+     * @return A new admin session.
+     * @throws RepositoryException
+     */
+    public Session createAdminSession()
+    throws RepositoryException {
+        final SlingRepository repo = this.repository;
+        if ( repo == null ) {
+            // as the repo is a hard dependency for this service, the repo
+            // is always available, but we check for null anyway!
+            throw new RepositoryException("Repository is currently not available.");
+        }
+        return repo.loginAdministrative(null);
+    }
+
+    /**
+     * Return the event admin.
+     */
+    public EventAdmin getEventAdmin() {
+        return this.eventAdmin;
+    }
+
+    protected void bindThreadPool(final EventingThreadPool etp) {
+        this.threadPool = etp;
+    }
+
+    protected void unbindThreadPool(final EventingThreadPool etp) {
+        if ( this.threadPool == etp ) {
+            this.threadPool = null;
+        }
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPoolConfig.ThreadPriority;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.osgi.service.component.ComponentContext;
+
+
+/**
+ * The configurable eventing thread pool.
+ */
+@Component(label="%event.pool.name",
+        description="%event.pool.description",
+        metatype=true)
+@Service(value=EventingThreadPool.class)
+public class EventingThreadPool implements ThreadPool {
+
+    @Reference
+    protected ThreadPoolManager threadPoolManager;
+
+    /** The real thread pool used. */
+    private org.apache.sling.commons.threads.ThreadPool threadPool;
+
+    private static final int DEFAULT_MIN_POOL_SIZE = 35; // this is sufficient for all threads + approx 25 job queues
+    private static final int DEFAULT_MAX_POOL_SIZE = 50;
+
+    @Property(intValue=DEFAULT_MIN_POOL_SIZE)
+    private static final String PROPERTY_MIN_POOL_SIZE = "minPoolSize";
+    @Property(intValue=DEFAULT_MAX_POOL_SIZE)
+    private static final String PROPERTY_MAX_POOL_SIZE = "maxPoolSize";
+
+    @Property(value="NORM",
+            options={@PropertyOption(name="NORM",value="Norm"),
+                     @PropertyOption(name="MIN",value="Min"),
+                     @PropertyOption(name="MAX",value="Max")})
+    private static final String PROPERTY_PRIORITY = "priority";
+
+    /**
+     * Activate this component.
+     * @param context
+     */
+    protected void activate(final ComponentContext ctx) {
+        final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig();
+        config.setMinPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MIN_POOL_SIZE), DEFAULT_MIN_POOL_SIZE));
+        config.setMaxPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MAX_POOL_SIZE), DEFAULT_MAX_POOL_SIZE));
+        config.setQueueSize(-1); // unlimited
+        config.setShutdownGraceful(true);
+        config.setPriority(ThreadPriority.valueOf(OsgiUtil.toString(ctx.getProperties().get(PROPERTY_PRIORITY), "NORM")));
+        config.setDaemon(true);
+        this.threadPool = threadPoolManager.create(config, "Apache Sling Eventing Thread Pool");
+    }
+
+    /**
+     * Deactivate this component.
+     * @param context
+     */
+    protected void deactivate(final ComponentContext context) {
+        this.threadPoolManager.release(this.threadPool);
+    }
+
+    /**
+     * @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable)
+     */
+    public void execute(Runnable runnable) {
+        threadPool.execute(runnable);
+    }
+
+    /**
+     * @see org.apache.sling.commons.threads.ThreadPool#getConfiguration()
+     */
+    public ThreadPoolConfig getConfiguration() {
+        return threadPool.getConfiguration();
+    }
+
+    /**
+     * @see org.apache.sling.commons.threads.ThreadPool#getName()
+     */
+    public String getName() {
+        return threadPool.getName();
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,809 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import javax.jcr.Item;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.lock.Lock;
+import javax.jcr.lock.LockException;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.qom.Comparison;
+import javax.jcr.query.qom.Constraint;
+import javax.jcr.query.qom.QueryObjectModelFactory;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.scheduler.Job;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.TimedEventStatusProvider;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+
+/**
+ * An event handler for timed events.
+ *
+ */
+@Component(immediate=true)
+@Service(value=TimedEventStatusProvider.class)
+@Properties({
+     @Property(name="event.topics",propertyPrivate=true,
+               value={"org/osgi/framework/BundleEvent/UPDATED",
+                      "org/osgi/framework/BundleEvent/STARTED",
+                      EventUtil.TOPIC_TIMED_EVENT}),
+     @Property(name="repository.path",value="/var/eventing/timed-jobs",propertyPrivate=true)
+})
+public class TimedJobHandler
+    extends AbstractRepositoryEventHandler
+    implements Job, TimedEventStatusProvider {
+
+    private static final String JOB_TOPIC = "topic";
+
+    private static final String JOB_CONFIG = "config";
+
+    private static final String JOB_SCHEDULE_INFO = "info";
+
+    @Reference
+    private Scheduler scheduler;
+
+    /** Unloaded events. */
+    private Set<String>unloadedEvents = new HashSet<String>();
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+     */
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
+        // load timed events from repository
+        this.loadEvents();
+        this.writerSession.getWorkspace().getObservationManager()
+            .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED|javax.jcr.observation.Event.PROPERTY_REMOVED, this.repositoryPath, true, null, null, true);
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+     */
+    protected void processWriteQueue() {
+        while ( this.running ) {
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( this.running && event != null ) {
+                ScheduleInfo scheduleInfo = null;
+                try {
+                    scheduleInfo = new ScheduleInfo(event);
+                } catch (IllegalArgumentException iae) {
+                    this.logger.error(iae.getMessage());
+                }
+                if ( scheduleInfo != null ) {
+                    final EventInfo info = new EventInfo();
+                    info.event = event;
+
+                    // write event and update path
+                    // if something went wrong we get the node path and reschedule
+                    synchronized ( this.writeLock ) {
+                        info.nodePath = this.persistEvent(info.event, scheduleInfo);
+                    }
+                    if ( info.nodePath != null ) {
+                        try {
+                            this.queue.put(info);
+                        } catch (InterruptedException e) {
+                            // this should never happen, so we ignore it
+                            this.ignoreException(e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+     */
+    protected void runInBackground() {
+        while ( this.running ) {
+            // so let's wait/get the next info from the queue
+            EventInfo info = null;
+            try {
+                info = this.queue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( info != null && this.running ) {
+                synchronized ( this.writeLock ) {
+                    ScheduleInfo scheduleInfo = null;
+                    try {
+                        scheduleInfo = new ScheduleInfo(info.event);
+                    } catch (IllegalArgumentException iae) {
+                        this.logger.error(iae.getMessage());
+                    }
+                    if ( scheduleInfo != null ) {
+                        try {
+                            this.writerSession.refresh(true);
+                            if ( this.writerSession.itemExists(info.nodePath) ) {
+                                final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
+                                if ( !eventNode.isLocked() ) {
+                                    // lock node
+                                    Lock lock = null;
+                                    try {
+                                        lock = eventNode.getSession().getWorkspace().getLockManager().lock(info.nodePath, false, true, Long.MAX_VALUE, "TimedJobHandler");
+                                    } catch (RepositoryException re) {
+                                        // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                                    }
+                                    if ( lock != null ) {
+                                        // if something went wrong, we reschedule
+                                        if ( !this.processEvent(info.event, scheduleInfo) ) {
+                                            try {
+                                                this.queue.put(info);
+                                            } catch (InterruptedException e) {
+                                                // this should never happen, so we ignore it
+                                                this.ignoreException(e);
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        } catch (RepositoryException e) {
+                            // ignore
+                            this.ignoreException(e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
+        try {
+            // get parent node
+            final Node parentNode = this.getWriterRootNode();
+            final String nodeName = scheduleInfo.jobId;
+            // is there already a node?
+            final Node foundNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+            Lock lock = null;
+            if ( scheduleInfo.isStopEvent() ) {
+                // if this is a stop event, we should remove the node from the repository
+                // if there is no node someone else was faster and we can ignore this
+                if ( foundNode != null ) {
+                    try {
+                        foundNode.remove();
+                        writerSession.save();
+                    } catch (LockException le) {
+                        // if someone else has the lock this is fine
+                    }
+                }
+                // stop the scheduler
+                processEvent(event, scheduleInfo);
+            } else {
+                // if there is already a node, it means we must handle an update
+                if ( foundNode != null ) {
+                    try {
+                        foundNode.remove();
+                        writerSession.save();
+                    } catch (LockException le) {
+                        // if someone else has the lock this is fine
+                    }
+                    // create a stop event
+                    processEvent(event, scheduleInfo.getStopInfo());
+                }
+                // we only write the event if this is a local one
+                if ( EventUtil.isLocal(event) ) {
+
+                    // write event to repository, lock it and schedule the event
+                    final Node eventNode = writeEvent(event, nodeName);
+                    lock = eventNode.getSession().getWorkspace().getLockManager().lock(eventNode.getPath(), false, true, Long.MAX_VALUE, "TimedJobHandler");
+                }
+            }
+
+            if ( lock != null ) {
+                // if something went wrong, we reschedule
+                if ( !this.processEvent(event, scheduleInfo) ) {
+                    final String path = lock.getNode().getPath();
+                    writerSession.getWorkspace().getLockManager().unlock(path);
+                    return path;
+                }
+            }
+        } catch (RepositoryException re ) {
+            // something went wrong, so let's log it
+            this.logger.error("Exception during writing new job to repository.", re);
+        }
+        return null;
+    }
+
+    /**
+     * Process the event.
+     * If a scheduler is available, a job is scheduled or stopped.
+     * @param event The incomming event.
+     * @return
+     */
+    protected boolean processEvent(final Event event, final ScheduleInfo scheduleInfo) {
+        final Scheduler localScheduler = this.scheduler;
+        if ( localScheduler != null ) {
+            // is this a stop event?
+            if ( scheduleInfo.isStopEvent() ) {
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Stopping timed event " + event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC) + "(" + scheduleInfo.jobId + ")");
+                }
+                try {
+                    localScheduler.removeJob(scheduleInfo.jobId);
+                } catch (NoSuchElementException nsee) {
+                    // this can happen if the job is scheduled on another node
+                    // so we can just ignore this
+                }
+                return true;
+            }
+            // we ignore remote job events
+            if ( !EventUtil.isLocal(event) ) {
+                return true;
+            }
+
+            // Create configuration for scheduled job
+            final Map<String, Serializable> config = new HashMap<String, Serializable>();
+            // copy properties
+            final Hashtable<String, Object> properties = new Hashtable<String, Object>();
+            config.put(JOB_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+            final String[] names = event.getPropertyNames();
+            if ( names != null ) {
+                for(int i=0; i<names.length; i++) {
+                    properties.put(names[i], event.getProperty(names[i]));
+                }
+            }
+            config.put(JOB_CONFIG, properties);
+            config.put(JOB_SCHEDULE_INFO, scheduleInfo);
+
+            try {
+                if ( scheduleInfo.expression != null ) {
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with cron expression " + scheduleInfo.expression);
+                    }
+                    localScheduler.addJob(scheduleInfo.jobId, this, config, scheduleInfo.expression, false);
+                } else if ( scheduleInfo.period != null ) {
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with period " + scheduleInfo.period);
+                    }
+                    localScheduler.addPeriodicJob(scheduleInfo.jobId, this, config, scheduleInfo.period, false);
+                } else {
+                    // then it must be date
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with date " + scheduleInfo.date);
+                    }
+                    localScheduler.fireJobAt(scheduleInfo.jobId, this, config, scheduleInfo.date);
+                }
+                return true;
+            } catch (Exception e) {
+                this.ignoreException(e);
+            }
+        } else {
+            this.logger.error("No scheduler available to start timed event " + event);
+        }
+        return false;
+    }
+
+    /**
+     * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+     */
+    public void onEvent(EventIterator iter) {
+        // we create an own session here
+        Session s = null;
+        try {
+            while ( iter.hasNext() ) {
+                final javax.jcr.observation.Event event = iter.nextEvent();
+                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+                    || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+
+                    final String propPath = event.getPath();
+                    int pos = propPath.lastIndexOf('/');
+                    final String nodePath = propPath.substring(0, pos);
+                    final String propertyName = propPath.substring(pos+1);
+                    // we are only interested in unlocks
+                    if ( "jcr:lockOwner".equals(propertyName) ) {
+                        try {
+                            if ( s == null ) {
+                                s = this.environment.createAdminSession();
+                            }
+                            final Node eventNode = (Node) s.getItem(nodePath);
+                            if ( !eventNode.isLocked() ) {
+                                try {
+                                    final EventInfo info = new EventInfo();
+                                    info.event = this.readEvent(eventNode);
+                                    info.nodePath =nodePath;
+                                    try {
+                                        this.queue.put(info);
+                                    } catch (InterruptedException e) {
+                                        // we ignore this exception as this should never occur
+                                        this.ignoreException(e);
+                                    }
+                                } catch (ClassNotFoundException cnfe) {
+                                    // add it to the unloaded set
+                                    synchronized (unloadedEvents) {
+                                        this.unloadedEvents.add(nodePath);
+                                    }
+                                    this.ignoreException(cnfe);
+                                }
+                            }
+                        } catch (RepositoryException re) {
+                            this.logger.error("Exception during jcr event processing.", re);
+                        }
+                    }
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Unable to create a session.", re);
+        } finally {
+            if ( s != null ) {
+                s.logout();
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    public void handleEvent(Event event) {
+        if ( event.getTopic().equals(EventUtil.TOPIC_TIMED_EVENT) ) {
+            // queue the event in order to respond quickly
+            try {
+                this.writeQueue.put(event);
+            } catch (InterruptedException e) {
+                // this should never happen
+                this.ignoreException(e);
+            }
+        } else {
+            // bundle event started or updated
+            boolean doIt = false;
+            synchronized ( this.unloadedEvents ) {
+                if ( this.unloadedEvents.size() > 0 ) {
+                    doIt = true;
+                }
+            }
+            if ( doIt ) {
+                final Runnable t = new Runnable() {
+
+                    public void run() {
+                        synchronized (unloadedEvents) {
+                            Session s = null;
+                            final Set<String> newUnloadedEvents = new HashSet<String>();
+                            newUnloadedEvents.addAll(unloadedEvents);
+                            try {
+                                s = environment.createAdminSession();
+                                for(String path : unloadedEvents ) {
+                                    newUnloadedEvents.remove(path);
+                                    try {
+                                        if ( s.itemExists(path) ) {
+                                            final Node eventNode = (Node) s.getItem(path);
+                                            if ( !eventNode.isLocked() ) {
+                                                try {
+                                                    final EventInfo info = new EventInfo();
+                                                    info.event = readEvent(eventNode);
+                                                    info.nodePath = path;
+                                                    try {
+                                                        queue.put(info);
+                                                    } catch (InterruptedException e) {
+                                                        // we ignore this exception as this should never occur
+                                                        ignoreException(e);
+                                                    }
+                                                } catch (ClassNotFoundException cnfe) {
+                                                    newUnloadedEvents.add(path);
+                                                    ignoreException(cnfe);
+                                                }
+                                            }
+                                        }
+                                    } catch (RepositoryException re) {
+                                        // we ignore this and readd
+                                        newUnloadedEvents.add(path);
+                                        ignoreException(re);
+                                    }
+                                }
+                            } catch (RepositoryException re) {
+                                // unable to create session, so we try it again next time
+                                ignoreException(re);
+                            } finally {
+                                if ( s != null ) {
+                                    s.logout();
+                                }
+                                unloadedEvents.clear();
+                                unloadedEvents.addAll(newUnloadedEvents);
+                            }
+                        }
+                    }
+
+                };
+                Environment.THREAD_POOL.execute(t);
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+     */
+    public void execute(JobContext context) {
+        final String topic = (String) context.getConfiguration().get(JOB_TOPIC);
+        @SuppressWarnings("unchecked")
+        final Dictionary<Object, Object> properties = (Dictionary<Object, Object>) context.getConfiguration().get(JOB_CONFIG);
+        final EventAdmin ea = this.environment.getEventAdmin();
+        if ( ea != null ) {
+            try {
+                ea.postEvent(new Event(topic, properties));
+            } catch (IllegalArgumentException iae) {
+                this.logger.error("Scheduled event has illegal topic: " + topic, iae);
+            }
+        } else {
+            this.logger.warn("Unable to send timed event as no event admin service is available.");
+        }
+        final ScheduleInfo info = (ScheduleInfo) context.getConfiguration().get(JOB_SCHEDULE_INFO);
+        // is this job scheduled for a specific date?
+        if ( info.date != null ) {
+            // we can remove it from the repository
+            // we create an own session here
+            Session s = null;
+            try {
+                s = this.environment.createAdminSession();
+                if ( s.itemExists(this.repositoryPath) ) {
+                    final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                    final String nodeName = info.jobId;
+                    final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+                    if ( eventNode != null ) {
+                        try {
+                            eventNode.remove();
+                            s.save();
+                        } catch (RepositoryException re) {
+                            // we ignore the exception if removing fails
+                            ignoreException(re);
+                        }
+                    }
+                }
+            } catch (RepositoryException re) {
+                this.logger.error("Unable to create a session.", re);
+            } finally {
+                if ( s != null ) {
+                    s.logout();
+                }
+            }
+        }
+    }
+
+    /**
+     * Load all active timed events from the repository.
+     * @throws RepositoryException
+     */
+    protected void loadEvents() {
+        try {
+            final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
+            final String selectorName = "nodetype";
+
+            final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+            final Query q = qomf.createQuery(
+                    qomf.selector(getEventNodeType(), selectorName),
+                    qomf.descendantNode(selectorName, this.repositoryPath),
+                    null,
+                    null
+            );
+            final NodeIterator result = q.execute().getNodes();
+            while ( result.hasNext() ) {
+                final Node eventNode = result.nextNode();
+                if ( !eventNode.isLocked() ) {
+                    final String nodePath = eventNode.getPath();
+                    try {
+                        final Event event = this.readEvent(eventNode);
+                        final EventInfo info = new EventInfo();
+                        info.event = event;
+                        info.nodePath = nodePath;
+                        try {
+                            this.queue.put(info);
+                        } catch (InterruptedException e) {
+                            // we ignore this exception as this should never occur
+                            this.ignoreException(e);
+                        }
+                    } catch (ClassNotFoundException cnfe) {
+                        // add it to the unloaded set
+                        synchronized (unloadedEvents) {
+                            this.unloadedEvents.add(nodePath);
+                        }
+                        this.ignoreException(cnfe);
+                    } catch (RepositoryException re) {
+                        // if reading an event fails, we ignore this
+                        this.ignoreException(re);
+                    }
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Exception during initial loading of stored timed events.", re);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+     */
+    protected void addNodeProperties(Node eventNode, Event event)
+    throws RepositoryException {
+        super.addNodeProperties(eventNode, event);
+        eventNode.setProperty(JCRHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+        final ScheduleInfo info = new ScheduleInfo(event);
+        if ( info.date != null ) {
+            final Calendar c = Calendar.getInstance();
+            c.setTime(info.date);
+            eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_DATE, c);
+        }
+        if ( info.expression != null ) {
+            eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_EXPRESSION, info.expression);
+        }
+        if ( info.period != null ) {
+            eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_PERIOD, info.period.longValue());
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getEventNodeType()
+     */
+    protected String getEventNodeType() {
+        return JCRHelper.TIMED_EVENT_NODE_TYPE;
+    }
+
+    protected static final class ScheduleInfo implements Serializable {
+
+        private static final long serialVersionUID = 8667701700547811142L;
+
+        public final String expression;
+        public final Long   period;
+        public final Date   date;
+        public final String jobId;
+
+        public ScheduleInfo(final Event event)
+        throws IllegalArgumentException {
+            // let's see if a schedule information is specified or if the job should be stopped
+            this.expression = (String) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE);
+            this.period = (Long) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_PERIOD);
+            this.date = (Date) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_DATE);
+            int count = 0;
+            if ( this.expression != null) {
+                count++;
+            }
+            if ( this.period != null ) {
+                count++;
+            }
+            if ( this.date != null ) {
+                count++;
+            }
+            if ( count > 1 ) {
+                throw new IllegalArgumentException("Only one configuration property from " + EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE +
+                                      ", " + EventUtil.PROPERTY_TIMED_EVENT_PERIOD +
+                                      ", or " + EventUtil.PROPERTY_TIMED_EVENT_DATE + " should be used.");
+            }
+            // we create a job id consisting of the real event topic and an (optional) id
+            // if the event contains a timed event id or a job id we'll append that to the name
+            String topic = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+            if ( topic == null ) {
+                throw new IllegalArgumentException("Timed event does not contain required property " + EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+            }
+            String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
+            String jId = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
+
+            //this.jobId = getJobId(topic, id, jId);
+            this.jobId = getJobId(topic, id, jId);
+        }
+
+        private ScheduleInfo(String jobId) {
+            this.expression = null;
+            this.period = null;
+            this.date = null;
+            this.jobId = jobId;
+        }
+
+        public ScheduleInfo getStopInfo() {
+            return new ScheduleInfo(this.jobId);
+        }
+
+        public boolean isStopEvent() {
+            return this.expression == null && this.period == null && this.date == null;
+        }
+
+        public static String getJobId(String topic, String timedEventId, String jobId) {
+            return topic.replace('/', '.') + "/TimedEvent " + (timedEventId != null ? Utility.filter(timedEventId) : "") + '_' + (jobId != null ? Utility.filter(jobId) : "");
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvent(java.lang.String, java.lang.String, java.lang.String)
+     */
+    public Event getScheduledEvent(String topic, String eventId, String jobId) {
+        Session s = null;
+        try {
+            s = this.environment.createAdminSession();
+            if ( s.itemExists(this.repositoryPath) ) {
+                final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                final String nodeName = ScheduleInfo.getJobId(topic, eventId, jobId);
+                final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+                if ( eventNode != null ) {
+                    return this.readEvent(eventNode);
+                }
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Unable to create a session.", re);
+        } catch (ClassNotFoundException e) {
+            this.ignoreException(e);
+        } finally {
+            if ( s != null ) {
+                s.logout();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvents(java.lang.String, java.util.Map...)
+     */
+    public Collection<Event> getScheduledEvents(String topic, Map<String, Object>... filterProps) {
+        // we create a new session
+        Session s = null;
+        final List<Event> jobs = new ArrayList<Event>();
+        try {
+            s = this.environment.createAdminSession();
+            final QueryManager qManager = s.getWorkspace().getQueryManager();
+            final String selectorName = "nodetype";
+
+            final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+            final String path;
+            if ( topic == null ) {
+                path = this.repositoryPath;
+            } else {
+                path = this.repositoryPath + '/' + topic.replace('/', '.');
+            }
+            Constraint constraint = qomf.descendantNode(selectorName, path);
+            if ( filterProps != null && filterProps.length > 0 ) {
+                Constraint orConstraint = null;
+                for (Map<String,Object> template : filterProps) {
+                    Constraint comp = null;
+                    final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
+                    while ( i.hasNext() ) {
+                        final Map.Entry<String, Object> current = i.next();
+                        // check prop name first
+                        final String propName = JCRHelper.getNodePropertyName(current.getKey());
+                        if ( propName != null ) {
+                            // check value
+                            final Value value = JCRHelper.getNodePropertyValue(s.getValueFactory(), current.getValue());
+                            if ( value != null ) {
+                                final Comparison newComp = qomf.comparison(qomf.propertyValue(selectorName, propName),
+                                        QueryObjectModelFactory.JCR_OPERATOR_EQUAL_TO,
+                                        qomf.literal(value));
+                                if ( comp == null ) {
+                                    comp = newComp;
+                                } else {
+                                    comp = qomf.and(comp, newComp);
+                                }
+                            }
+                        }
+                    }
+                    if ( comp != null ) {
+                        if ( orConstraint == null ) {
+                            orConstraint = comp;
+                        } else {
+                            orConstraint = qomf.or(orConstraint, comp);
+                        }
+                    }
+                }
+                if ( orConstraint != null ) {
+                    constraint = qomf.and(constraint, orConstraint);
+                }
+            }
+            final Query q = qomf.createQuery(
+                    qomf.selector(getEventNodeType(), selectorName),
+                    constraint,
+                    null,
+                    null
+            );
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Executing job query {}.", q.getStatement());
+            }
+
+            final NodeIterator iter = q.execute().getNodes();
+            while ( iter.hasNext() ) {
+                final Node eventNode = iter.nextNode();
+                try {
+                    final Event event = this.readEvent(eventNode);
+                    jobs.add(event);
+                } catch (ClassNotFoundException cnfe) {
+                    // in the case of a class not found exception we just ignore the exception
+                    this.ignoreException(cnfe);
+                }
+            }
+        } catch (RepositoryException e) {
+            // in the case of an error, we return an empty list
+            this.ignoreException(e);
+        } finally {
+            if ( s != null) {
+                s.logout();
+            }
+        }
+        return jobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.TimedEventStatusProvider#cancelTimedEvent(java.lang.String)
+     */
+    public void cancelTimedEvent(String jobId) {
+        synchronized ( this.writeLock ) {
+            try {
+                // is there a node?
+                final Item foundNode = this.writerSession.itemExists(jobId) ? this.writerSession.getItem(jobId) : null;
+                // we should remove the node from the repository
+                // if there is no node someone else was faster and we can ignore this
+                if ( foundNode != null ) {
+                    try {
+                        foundNode.remove();
+                        writerSession.save();
+                    } catch (LockException le) {
+                        // if someone else has the lock this is fine
+                    }
+                }
+            } catch ( RepositoryException re) {
+                this.logger.error("Unable to cancel timed event: " + jobId, re);
+            }
+            // stop the scheduler
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("Stopping timed event " + jobId);
+            }
+            final Scheduler localScheduler = this.scheduler;
+            if ( localScheduler != null ) {
+                try {
+                    localScheduler.removeJob(jobId);
+                } catch (NoSuchElementException nsee) {
+                    // this can happen if the job is scheduled on another node
+                    // so we can just ignore this
+                }
+            }
+        }
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
+import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue;
+import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue;
+import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.JobsIterator;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.Statistics;
+import org.osgi.service.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An event handler for special job events.
+ *
+ * We schedule this event handler to run in the background
+ * and clean up obsolete queues.
+ *
+ */
+@Component(label="%job.events.name",
+        description="%job.events.description",
+        metatype=true,immediate=true)
+@Services({
+    @Service(value=Runnable.class),
+    @Service(value=JobManager.class)
+})
+@Properties({
+    @Property(name="scheduler.period", longValue=300,
+            label="%jobscheduler.period.name",
+            description="%jobscheduler.period.description"),
+    @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
+    @Property(name=ConfigurationConstants.PROP_PRIORITY,
+            value=ConfigurationConstants.DEFAULT_PRIORITY,
+            options={@PropertyOption(name="NORM",value="Norm"),
+            @PropertyOption(name="MIN",value="Min"),
+            @PropertyOption(name="MAX",value="Max")}),
+    @Property(name=ConfigurationConstants.PROP_RETRIES,
+            intValue=ConfigurationConstants.DEFAULT_RETRIES),
+    @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
+            longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+    @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
+            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL)
+})
+public class DefaultJobManager
+    implements Runnable, JobManager {
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** The environment component. */
+    @Reference
+    private EnvironmentComponent environment;
+
+    /** The configuration manager. */
+    @Reference
+    private QueueConfigurationManager configManager;
+
+    /** The scheduler service. */
+    @Reference
+    private Scheduler scheduler;
+
+    /** All active queues. */
+    private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+
+    /** Main configuration. */
+    private InternalQueueConfiguration mainConfiguration;
+
+    /** Base statistics. */
+    private final StatisticsImpl baseStatistics = new StatisticsImpl();
+
+    /** Current statistics. */
+    private StatisticsImpl currentStatistics;
+
+    /** Last update for current statistics. */
+    private long lastUpdatedStatistics;
+
+    /** All existing events. */
+    private final Map<String, JobEvent> allEvents = new HashMap<String, JobEvent>();
+
+    /** All existing events by topic. */
+    private final Map<String, List<JobEvent>> allEventsByTopic = new HashMap<String, List<JobEvent>>();
+
+    /**
+     * Activate this component.
+     * @param props Configuration properties
+     */
+    @Activate
+    protected void activate(final Map<String, Object> props) {
+        this.update(props);
+        logger.info("Apache Sling Job Event Handler started on instance {}", Environment.APPLICATION_ID);
+    }
+
+    /**
+     * Configure this component.
+     * @param props Configuration properties
+     */
+    @Modified
+    protected void update(final Map<String, Object> props) {
+        // create a new dictionary with the missing info and do some sanety puts
+        final Map<String, Object> queueProps = new HashMap<String, Object>(props);
+        queueProps.remove(ConfigurationConstants.PROP_APP_IDS);
+        queueProps.put(ConfigurationConstants.PROP_TOPICS, "*");
+        queueProps.put(ConfigurationConstants.PROP_NAME, "<main queue>");
+        queueProps.put(ConfigurationConstants.PROP_RUN_LOCAL, false);
+        queueProps.put(ConfigurationConstants.PROP_TYPE, InternalQueueConfiguration.Type.UNORDERED);
+
+        // check max parallel - this should never be lower than 2!
+        final int maxParallel = OsgiUtil.toInteger(queueProps.get(ConfigurationConstants.PROP_MAX_PARALLEL),
+                ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+        if ( maxParallel < 2 ) {
+            this.logger.debug("Ignoring invalid setting of {} for {}. Setting to minimum value: 2",
+                    maxParallel, ConfigurationConstants.PROP_MAX_PARALLEL);
+            queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 2);
+        }
+        this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
+    }
+
+    /**
+     * Dectivate this component.
+     */
+    @Deactivate
+    protected void deactivate() {
+        final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
+        while ( i.hasNext() ) {
+            final AbstractJobQueue jbq = i.next();
+            jbq.close();
+        }
+        this.queues.clear();
+        logger.info("Apache Sling Job Event Handler stopped on instance {}", Environment.APPLICATION_ID);
+    }
+
+    /**
+     * This method is invoked periodically by the scheduler.
+     * It searches for idle queues and stops them after a timeout. If a queue
+     * is idle for two consecutive clean up calls, it is removed.
+     * @see java.lang.Runnable#run()
+     */
+    public void cleanup() {
+        // check for idle queue
+        // we synchronize to avoid creating a queue which is about to be removed during cleanup
+        synchronized ( this ) {
+            final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
+            while ( i.hasNext() ) {
+                final Map.Entry<String, AbstractJobQueue> current = i.next();
+                final AbstractJobQueue jbq = current.getValue();
+                if ( jbq.isMarkedForCleanUp() ) {
+                    // close
+                    jbq.close();
+                    // copy statistics
+                    this.baseStatistics.add(jbq);
+                    // remove
+                    i.remove();
+                } else {
+                    // mark to be removed during next cycle
+                    jbq.markForCleanUp();
+                }
+            }
+        }
+    }
+
+    /**
+     * Process a new job event.
+     * This method first searches the corresponding queue - if such a queue
+     * does not exist yet, it is created and started.
+     * @param event The job event
+     */
+    public void process(final JobEvent event) {
+        // get the queue configuration
+        InternalQueueConfiguration config = configManager.getQueueConfiguration(event);
+
+        // if no queue config is found, we either create a new queue or use the main queue
+        if ( config == null ) {
+            final String customQueueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
+            if ( customQueueName != null ) {
+                synchronized ( this ) {
+                    final AbstractJobQueue queue = this.queues.get(customQueueName);
+                    if ( queue != null ) {
+                        config = queue.getConfiguration();
+                    } else {
+                        config = new InternalQueueConfiguration(event.event);
+                    }
+                    event.queueName = customQueueName;
+                }
+            } else {
+                config = this.mainConfiguration;
+                event.queueName = this.mainConfiguration.getName();
+            }
+        }
+
+        // get the queue name
+        final String queueName = event.queueName;
+
+        if ( config.isSkipped(event) ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Ignoring event due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
+            }
+            return;
+        }
+
+        // get or create queue
+        AbstractJobQueue queue = null;
+        // we synchronize to avoid creating a queue which is about to be removed during cleanup
+        synchronized ( this ) {
+            queue = this.queues.get(queueName);
+            // check for reconfiguration, we really do an identity check here(!)
+            if ( queue != null && queue.getConfiguration() != config ) {
+                // remove the queue with the old name
+                this.queues.remove(queueName);
+                // check if we can close or have to rename
+                queue.markForCleanUp();
+                if ( queue.isMarkedForCleanUp() ) {
+                    // close
+                    queue.close();
+                    // copy statistics
+                    this.baseStatistics.add(queue);
+                } else {
+                    // notify queue
+                    queue.rename(queueName + "<outdated>");
+                    // readd with new name
+                    this.queues.put(queue.getName(), queue);
+                }
+                // we use a new queue with the configuration
+                queue = null;
+            }
+            if ( queue == null ) {
+                if ( config.getType() == InternalQueueConfiguration.Type.ORDERED ) {
+                    queue = new OrderedJobQueue(queueName, config, this.environment);
+                } else if ( config.getType() == InternalQueueConfiguration.Type.UNORDERED ) {
+                    queue = new ParallelJobQueue(queueName, config, this.environment, this.scheduler);
+                } else if ( config.getType() == InternalQueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
+                    queue = new TopicRoundRobinJobQueue(queueName, config, this.environment, this.scheduler);
+                }
+                if ( queue == null ) {
+                    // this is just a sanety check, actually we can never get here
+                    logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueName, EventUtil.toString(event.event));
+                    return;
+                }
+                queues.put(queueName, queue);
+                queue.start();
+            }
+        }
+
+        // and put event
+        queue.process(event);
+    }
+
+    /**
+     * This method is invoked periodically by the scheduler.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        this.cleanup();
+    }
+
+    /**
+     * Return our internal statistics object.
+     * We recalculate this every 1.5sec (if requested)
+     *
+     * @see org.apache.sling.event.jobs.JobManager#getStatistics()
+     */
+    public synchronized Statistics getStatistics() {
+        final long now = System.currentTimeMillis();
+        if ( this.currentStatistics == null || this.lastUpdatedStatistics + 1500 < now ) {
+            this.currentStatistics = this.baseStatistics.copy();
+            for(final AbstractJobQueue jq : this.queues.values() ) {
+                this.currentStatistics.add(jq);
+            }
+        }
+        return this.currentStatistics;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String)
+     */
+    public Queue getQueue(final String name) {
+        return this.queues.get(name);
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#getQueues()
+     */
+    public Iterable<Queue> getQueues() {
+        final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
+        return new Iterable<Queue>() {
+
+            public Iterator<Queue> iterator() {
+                return new Iterator<Queue>() {
+
+                    public boolean hasNext() {
+                        return jqI.hasNext();
+                    }
+
+                    public Queue next() {
+                        return jqI.next();
+                    }
+
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+    }
+
+    public InternalQueueConfiguration getMainQueueConfiguration() {
+        return this.mainConfiguration;
+    }
+
+    /**
+     * Add a job to all jobs.
+     */
+    public void notifyAddJob(final JobEvent job) {
+        final String key = job.uniqueId;
+        final String topic = (String)job.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+        final JobEvent oldJob;
+        synchronized ( this.allEvents ) {
+            oldJob = this.allEvents.put(key, job);
+        }
+        List<JobEvent> l;
+        synchronized ( this.allEventsByTopic ) {
+            l = this.allEventsByTopic.get(topic);
+            if ( l == null ) {
+                l = new ArrayList<JobEvent>();
+                this.allEventsByTopic.put(topic, l);
+            }
+        }
+        synchronized ( l ) {
+            if ( oldJob != null ) {
+                l.remove(oldJob);
+            }
+            l.add(job);
+        }
+    }
+
+    /**
+     * Remove a job from all jobs.
+     */
+    public void notifyRemoveJob(final String key) {
+        final JobEvent oldJob;
+        synchronized ( this.allEvents ) {
+            oldJob = this.allEvents.remove(key);
+        }
+        if ( oldJob != null ) {
+            final String topic = (String)oldJob.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+            final List<JobEvent> l;
+            synchronized ( this.allEventsByTopic ) {
+                l = this.allEventsByTopic.get(topic);
+            }
+            if ( l != null ) {
+                synchronized ( l ) {
+                    l.remove(oldJob);
+                }
+            }
+        }
+    }
+
+    /**
+     * Job started
+     */
+    public void notifyActiveJob(final String key) {
+        final JobEvent job;
+        synchronized ( this.allEvents ) {
+            job = this.allEvents.get(key);
+        }
+        if ( job != null ) {
+            job.started = 1;
+        }
+    }
+
+    /**
+     * Check the requested job type
+     */
+    private boolean checkType(final QueryType type, final JobEvent event) {
+        if ( type == QueryType.ALL ) {
+            return true;
+        }
+        if ( type == QueryType.ACTIVE && event.started == 1 ) {
+            return true;
+        }
+        if ( type == QueryType.QUEUED && event.started == -1 ) {
+            return true;
+        }
+        return false;
+    }
+
+    private enum Operation {
+        LESS,
+        LESS_OR_EQUALS,
+        EQUALS,
+        GREATER_OR_EQUALS,
+        GREATER
+    }
+    /**
+     * Check if the job matches the template
+     */
+    private boolean match(final JobEvent job, final Map<String, Object> template) {
+        if ( template != null ) {
+            for(final Map.Entry<String, Object> current : template.entrySet()) {
+                final String key = current.getKey();
+                final char firstChar = key.length() > 0 ? key.charAt(0) : 0;
+                final String propName;
+                final Operation op;
+                if ( firstChar == '=' ) {
+                    propName = key.substring(1);
+                    op  = Operation.EQUALS;
+                } else if ( firstChar == '<' ) {
+                    final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+                    if ( secondChar == '=' ) {
+                        op = Operation.LESS_OR_EQUALS;
+                        propName = key.substring(2);
+                    } else {
+                        op = Operation.LESS;
+                        propName = key.substring(1);
+                    }
+                } else if ( firstChar == '>' ) {
+                    final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+                    if ( secondChar == '=' ) {
+                        op = Operation.GREATER_OR_EQUALS;
+                        propName = key.substring(2);
+                    } else {
+                        op = Operation.GREATER;
+                        propName = key.substring(1);
+                    }
+                } else {
+                    propName = key;
+                    op  = Operation.EQUALS;
+                }
+                final Object value = current.getValue();
+
+                if ( op == Operation.EQUALS ) {
+                    if ( !value.equals(job.event.getProperty(propName)) ) {
+                        return false;
+                    }
+                } else {
+                    if ( value instanceof Comparable ) {
+                        @SuppressWarnings({ "unchecked", "rawtypes" })
+                        final int result = ((Comparable)value).compareTo(job.event.getProperty(propName));
+                        if ( op == Operation.LESS && result != -1 ) {
+                            return false;
+                        } else if ( op == Operation.LESS_OR_EQUALS && result == 1 ) {
+                            return false;
+                        } else if ( op == Operation.GREATER_OR_EQUALS && result == -1 ) {
+                            return false;
+                        } else if ( op == Operation.GREATER && result != 1 ) {
+                            return false;
+                        }
+                    } else {
+                        // if the value is not comparable we simply don't match
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    private void queryCollection(
+            final List<Event> result,
+            final QueryType type,
+            final Collection<JobEvent> collection,
+            final Map<String, Object>... filterProps) {
+        synchronized ( collection ) {
+            final Iterator<JobEvent> iter = collection.iterator();
+            while ( iter.hasNext() ) {
+                final JobEvent job = iter.next();
+                boolean add = checkType(type, job);
+                if ( add && filterProps != null && filterProps.length != 0 ) {
+                    add = false;
+                    for (Map<String,Object> template : filterProps) {
+                        add = this.match(job, template);
+                        if ( add ) {
+                            break;
+                        }
+                    }
+                }
+                if ( add ) {
+                    result.add(job.event);
+                }
+            }
+        }
+    }
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#queryJobs(QueryType, java.lang.String, java.util.Map...)
+     */
+    public JobsIterator queryJobs(final QueryType type,
+            final String topic,
+            final Map<String, Object>... filterProps) {
+        final List<Event> result = new ArrayList<Event>();
+        if ( topic != null ) {
+            final List<JobEvent> l;
+            synchronized ( this.allEventsByTopic ) {
+                l = this.allEventsByTopic.get(topic);
+            }
+            if ( l != null ) {
+                queryCollection(result, type, l, filterProps);
+            }
+        } else {
+            final Set<Collection<JobEvent>> topics;
+            synchronized ( this.allEventsByTopic ) {
+                topics = new HashSet<Collection<JobEvent>>(this.allEventsByTopic.values());
+            }
+            for(final Collection<JobEvent> l : topics) {
+                queryCollection(result, type, l, filterProps);
+            }
+        }
+        return new JobsIteratorImpl(result);
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#findJob(java.lang.String, java.util.Map)
+     */
+    public Event findJob(final String topic, final Map<String, Object> template) {
+        Event result = null;
+        if ( topic != null ) {
+            final List<JobEvent> l;
+            synchronized ( this.allEventsByTopic ) {
+                l = this.allEventsByTopic.get(topic);
+            }
+            if ( l != null ) {
+                synchronized ( l ) {
+                    final Iterator<JobEvent> iter = l.iterator();
+                    while ( result == null && iter.hasNext() ) {
+                        final JobEvent job = iter.next();
+                        if ( match(job, template) ) {
+                            result = job.event;
+                        }
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#removeJob(java.lang.String)
+     */
+    public boolean removeJob(final String jobId) {
+        final JobEvent job;
+        synchronized ( this.allEvents ) {
+            job = this.allEvents.get(jobId);
+        }
+        boolean result = true;
+        if ( job != null ) {
+            result = job.remove();
+            if ( result ) {
+                this.notifyRemoveJob(jobId);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#forceRemoveJob(java.lang.String)
+     */
+    public void forceRemoveJob(final String jobId) {
+        while ( !this.removeJob(jobId) ) {
+            // instead of using complicated syncs, waits and notifies we simply poll
+            try {
+                Thread.sleep(80);
+            } catch (final InterruptedException ignore) {
+                this.ignoreException(ignore);
+            }
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This object encapsulates all information about a job.
+ */
+public abstract class JobEvent {
+
+    public Event event;
+    public final String uniqueId;
+
+    public String queueName;
+
+    public long queued = -1;
+    public long started = -1;
+
+    public JobEvent(final Event e, final String uniqueId) {
+        this.event = e;
+        this.uniqueId = uniqueId;
+    }
+
+    public abstract boolean lock();
+    public abstract void unlock();
+    public abstract void finished();
+    public abstract boolean reschedule();
+    public abstract boolean remove();
+
+    @Override
+    public int hashCode() {
+        return this.uniqueId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if ( ! (obj instanceof JobEvent) ) {
+            return false;
+        }
+        return this.uniqueId.equals(((JobEvent)obj).uniqueId);
+    }
+}
\ No newline at end of file

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain