You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/02/11 16:19:32 UTC

svn commit: r620503 - in /incubator/sling/trunk/sling/event/src: main/java/org/apache/sling/event/ main/java/org/apache/sling/event/impl/ test/java/org/apache/sling/event/impl/

Author: cziegeler
Date: Mon Feb 11 07:19:29 2008
New Revision: 620503

URL: http://svn.apache.org/viewvc?rev=620503&view=rev
Log:
SLING-177: Improvements to the event and job handling:
- Adding a retry count and retry delay for jobs
- Remove locking of parent node
- Use separate write queue for minimum delay between publishing and storing
- Improve parallel job processing, only jobs of same type are now serialized, everything else runs in parallel.

Modified:
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
    incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java Mon Feb 11 07:19:29 2008
@@ -52,7 +52,7 @@
     /** The property to for setting the maximum number of retries. Value is of type Integer. */
     public static final String PROPERTY_JOB_RETRIES = "event.job.retries";
 
-    /** The property to set a retry delay. Value is of type Long. */
+    /** The property to set a retry delay. Value is of type Long and specifies milliseconds. */
     public static final String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
 
     /** The topic for jobs. */
@@ -140,36 +140,14 @@
 
     /**
      * Notify a failed job.
+     * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
      */
-    public static void rescheduleJob(Event job) {
+    public static boolean rescheduleJob(Event job) {
         final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
         if ( ctx == null ) {
             throw new NullPointerException("JobStatusNotifier context is not available in event properties.");
         }
-        boolean retry = true;
-        // check if we exceeded the number of retries
-        if ( job.getProperty(PROPERTY_JOB_RETRIES) != null ) {
-            int retries = (Integer) job.getProperty(PROPERTY_JOB_RETRIES);
-            int retryCount = 0;
-            if ( job.getProperty(PROPERTY_JOB_RETRY_COUNT) != null ) {
-                retryCount = (Integer)job.getProperty(PROPERTY_JOB_RETRY_COUNT);
-            }
-            retryCount++;
-            if ( retryCount >= retries ) {
-                retry = false;
-            }
-            // update event with retry count
-            final Dictionary<String, Object> newProperties;
-            // create a new dictionary
-            newProperties = new Hashtable<String, Object>();
-            final String[] names = job.getPropertyNames();
-            for(int i=0; i<names.length; i++ ) {
-                newProperties.put(names[i], job.getProperty(names[i]));
-            }
-            newProperties.put(PROPERTY_JOB_RETRY_COUNT, retryCount);
-            job = new Event(job.getTopic(), newProperties);
-        }
-        ctx.notifier.finishedJob(job, ctx.eventNodePath, ctx.lockToken, retry);
+        return ctx.notifier.finishedJob(job, ctx.eventNodePath, ctx.lockToken, true);
     }
 
     /**
@@ -217,6 +195,18 @@
             }
         }
 
-        void finishedJob(Event job, String eventNodePath, String lockToken, boolean reschedule);
+        /**
+         * Notify that the job is finished.
+         * If the job is not rescheduled, a return value of <code>false</code> indicates an error
+         * during the processing. If the job should be rescheduled, <code>true</code> indicates
+         * that the job could be rescheduled. If an error occurs or the number of retries is
+         * exceeded, <code>false</code> will be returned.
+         * @param job The job.
+         * @param eventNodePath The storage node in the repository.
+         * @param lockToken The lock token locking the node.
+         * @param reschedule Should the event be rescheduled?
+         * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
+         */
+        boolean finishedJob(Event job, String eventNodePath, String lockToken, boolean reschedule);
     }
 }

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java Mon Feb 11 07:19:29 2008
@@ -31,9 +31,11 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jcr.Node;
+import javax.jcr.NodeIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.observation.EventListener;
+import javax.jcr.query.Query;
 
 import org.apache.jackrabbit.JcrConstants;
 import org.apache.sling.event.EventUtil;
@@ -83,8 +85,8 @@
     /** Our application id. */
     protected String applicationId;
 
-    /** The repository session. */
-    protected Session session;
+    /** The repository session to write into the repository. */
+    protected Session writerSession;
 
     /** The path in the repository. */
     protected String repositoryPath;
@@ -95,9 +97,12 @@
     /** Is the background task still running? */
     protected boolean running;
 
-    /** A local queue for serialising the job processing. */
+    /** A local queue for serialising the event processing. */
     protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
 
+    /** A local queue for writing received events into the repository. */
+    protected final BlockingQueue<EventInfo> writeQueue = new LinkedBlockingQueue<EventInfo>();
+
     /**
      * Activate this component.
      * @param context
@@ -111,37 +116,79 @@
         if ( i != null ) {
             this.cleanupPeriod = i;
         }
-        // start background thread
+
+        // start background threads
         this.running = true;
+        // start writer thread
         final Thread t = new Thread() {
             public void run() {
                 try {
-                    startSession();
-                    runInBackground();
+                    startWriterSession();
                 } catch (RepositoryException e) {
                     // there is nothing we can do except log!
                     logger.error("Error during session starting.", e);
+                    running = false;
                 }
+                processWriteQueue();
             }
         };
         t.start();
+        final Thread t2 = new Thread() {
+            public void run() {
+                runInBackground();
+            }
+        };
+        t2.start();
     }
 
     /**
+     * This method is invoked periodically.
      * @see java.lang.Runnable#run()
      */
     public void run() {
         if ( this.cleanupPeriod > 0 ) {
-            this.cleanUpRepository();
+            this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+            final String queryString = this.getCleanUpQueryString();
+            if ( queryString != null ) {
+                // we create an own session for concurrency issues
+                Session s = null;
+                try {
+                    s = this.createSession();
+                    final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                    logger.debug("Executing query {}", queryString);
+                    final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+                    final NodeIterator iter = q.execute().getNodes();
+                    int count = 0;
+                    while ( iter.hasNext() ) {
+                        final Node eventNode = iter.nextNode();
+                        eventNode.remove();
+                        count++;
+                    }
+                    parentNode.save();
+                    logger.debug("Removed {} entries from the repository.", count);
+
+                } catch (RepositoryException e) {
+                    // in the case of an error, we just log this as a warning
+                    this.logger.warn("Exception during repository cleanup.", e);
+                } finally {
+                    if ( s != null ) {
+                        s.logout();
+                    }
+                }
+            }
         }
     }
 
     protected abstract void runInBackground();
 
+    protected abstract void processWriteQueue();
+
     /**
-     * Clean up the repository.
+     * The query to detect old entries which can be removed.
+     * This method is invoked periodically from the {@link #run()} method.
      */
-    protected abstract void cleanUpRepository();
+    protected abstract String getCleanUpQueryString();
 
     /**
      * Deactivate this component.
@@ -151,12 +198,18 @@
         // stop background thread, by adding a job info to wake it up
         this.running = false;
         try {
+            this.writeQueue.put(new EventInfo());
+        } catch (InterruptedException e) {
+            // we ignore this
+            this.ignoreException(e);
+        }
+        try {
             this.queue.put(new EventInfo());
         } catch (InterruptedException e) {
             // we ignore this
             this.ignoreException(e);
         }
-        this.stopSession();
+        this.stopWriterSession();
     }
 
     /**
@@ -178,8 +231,8 @@
      * for new events created on other nodes.
      * @throws RepositoryException
      */
-    protected void startSession() throws RepositoryException {
-        this.session = this.createSession();
+    protected void startWriterSession() throws RepositoryException {
+        this.writerSession = this.createSession();
         if ( this.repositoryPath != null ) {
             this.createRepositoryPath();
         }
@@ -188,16 +241,16 @@
     /**
      * Stop the session.
      */
-    protected void stopSession() {
-        if ( this.session != null ) {
+    protected void stopWriterSession() {
+        if ( this.writerSession != null ) {
             try {
-                this.session.getWorkspace().getObservationManager().removeEventListener(this);
+                this.writerSession.getWorkspace().getObservationManager().removeEventListener(this);
             } catch (RepositoryException e) {
                 // we just ignore it
                 this.logger.warn("Unable to remove event listener.", e);
             }
-            this.session.logout();
-            this.session = null;
+            this.writerSession.logout();
+            this.writerSession = null;
         }
     }
 
@@ -206,8 +259,8 @@
      */
     protected void createRepositoryPath()
     throws RepositoryException {
-        if ( !this.session.itemExists(this.repositoryPath) ) {
-            Node node = this.session.getRootNode();
+        if ( !this.writerSession.itemExists(this.repositoryPath) ) {
+            Node node = this.writerSession.getRootNode();
             String path = this.repositoryPath.substring(1);
             int pos = path.lastIndexOf('/');
             if ( pos != -1 ) {
@@ -255,7 +308,7 @@
     protected Node writeEvent(Event e)
     throws RepositoryException {
         // create new node with name of topic
-        final Node rootNode = (Node) this.session.getItem(this.repositoryPath);
+        final Node rootNode = (Node) this.writerSession.getItem(this.repositoryPath);
 
         final String nodeType = this.getEventNodeType();
         final String nodeName = this.getNodeName(e);

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Mon Feb 11 07:19:29 2008
@@ -22,12 +22,9 @@
 import java.util.Dictionary;
 
 import javax.jcr.Node;
-import javax.jcr.NodeIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.observation.EventIterator;
-import javax.jcr.query.Query;
-import javax.jcr.query.QueryManager;
 
 import org.apache.jackrabbit.util.ISO8601;
 import org.apache.sling.event.EventUtil;
@@ -45,52 +42,59 @@
     extends AbstractRepositoryEventHandler {
 
     /**
-     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#cleanUpRepository()
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getCleanUpQueryString()
      */
-    protected void cleanUpRepository() {
-        this.logger.debug("Cleaning up repository, removing all events older than {} minutes.", this.cleanupPeriod);
+    protected String getCleanUpQueryString() {
+        final Calendar deleteBefore = Calendar.getInstance();
+        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+        final String dateString = ISO8601.format(deleteBefore);
+
+        final StringBuffer buffer = new StringBuffer("/jcr:root");
+        buffer.append(this.repositoryPath);
+        buffer.append("//element(*, ");
+        buffer.append(getEventNodeType());
+        buffer.append(")[@");
+        buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+        buffer.append(" < xs:dateTime('");
+        buffer.append(dateString);
+        buffer.append("')]");
 
-        // we create an own session for concurrency issues
-        Session s = null;
-        try {
-            s = this.createSession();
-            final Node parentNode = (Node)s.getItem(this.repositoryPath);
-            final Calendar deleteBefore = Calendar.getInstance();
-            deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
-            final String dateString = ISO8601.format(deleteBefore);
-
-            final QueryManager qManager = parentNode.getSession().getWorkspace().getQueryManager();
-            final StringBuffer buffer = new StringBuffer("/jcr:root");
-            buffer.append(this.repositoryPath);
-            buffer.append("//element(*, ");
-            buffer.append(getEventNodeType());
-            buffer.append(")[@");
-            buffer.append(EventHelper.NODE_PROPERTY_CREATED);
-            buffer.append(" < xs:dateTime('");
-            buffer.append(dateString);
-            buffer.append("')]");
-
-            this.logger.debug("Executing query {}", buffer);
-            final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
-            final NodeIterator iter = q.execute().getNodes();
-            int count = 0;
-            while ( iter.hasNext() ) {
-                final Node eventNode = iter.nextNode();
-                eventNode.remove();
-                count++;
+        return buffer.toString();
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+     */
+    protected void processWriteQueue() {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            EventInfo info = null;
+            try {
+                info = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
             }
-            this.logger.debug("Removed {} event nodes from the repository.", count);
-            parentNode.save();
-        } catch (RepositoryException e) {
-            // in the case of an error, we just log this as a warning
-            this.logger.warn("Exception during repository cleanup.", e);
-        } finally {
-            if ( s != null ) {
-                s.logout();
+            if ( info != null && this.running ) {
+                try {
+                    final Node eventNode = this.writeEvent(info.event);
+                    info.nodePath = eventNode.getPath();
+                } catch (Exception e) {
+                    this.logger.error("Exception during writing the event to the repository.", e);
+                }
+                try {
+                    this.queue.put(info);
+                } catch (InterruptedException e) {
+                    // we ignore this
+                    this.ignoreException(e);
+                }
             }
         }
     }
 
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+     */
     protected void runInBackground() {
         while ( this.running ) {
             // so let's wait/get the next job from the queue
@@ -102,15 +106,11 @@
                 this.ignoreException(e);
             }
             if ( info != null && this.running ) {
-                if ( info.event != null ) {
-                    try {
-                        this.writeEvent(info.event);
-                    } catch (Exception e) {
-                        this.logger.error("Exception during writing the event to the repository.", e);
-                    }
-                } else if ( info.nodePath != null) {
+                if ( info.nodePath != null) {
+                    Session session = null;
                     try {
-                        final Node eventNode = (Node) this.session.getItem(info.nodePath);
+                        session = this.createSession();
+                        final Node eventNode = (Node)session.getItem(info.nodePath);
                         final EventAdmin localEA = this.eventAdmin;
                         if ( localEA != null ) {
                             localEA.postEvent(this.readEvent(eventNode));
@@ -119,6 +119,10 @@
                         }
                     } catch (Exception ex) {
                         this.logger.error("Exception during reading the event from the repository.", ex);
+                    } finally {
+                        if ( session != null ) {
+                            session.logout();
+                        }
                     }
                 }
             }
@@ -132,7 +136,7 @@
         try {
             final EventInfo info = new EventInfo();
             info.event = event;
-            this.queue.put(info);
+            this.writeQueue.put(info);
         } catch (InterruptedException ex) {
             // we ignore this
             this.ignoreException(ex);
@@ -169,13 +173,11 @@
 
 
     /**
-     * Start the repository session and add this handler as an observer
-     * for new events created on other nodes.
-     * @throws RepositoryException
-     */
-    protected void startSession() throws RepositoryException {
-        super.startSession();
-        this.session.getWorkspace().getObservationManager()
-            .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+     */
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
+        this.writerSession.getWorkspace().getObservationManager()
+            .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, new String[] {this.getEventNodeType()}, true);
     }
 }

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Mon Feb 11 07:19:29 2008
@@ -22,9 +22,13 @@
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Dictionary;
+import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
+import javax.jcr.ItemExistsException;
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
 import javax.jcr.RepositoryException;
@@ -36,7 +40,6 @@
 
 import org.apache.jackrabbit.JcrConstants;
 import org.apache.jackrabbit.util.ISO8601;
-import org.apache.jackrabbit.util.Locked;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
 import org.osgi.service.component.ComponentContext;
@@ -47,20 +50,23 @@
 /**
  * An event handler handling special job events.
  *
- * @scr.component inherit="true"
+ * @scr.component
  * @scr.service interface="org.apache.sling.event.JobStatusProvider"
- * @scr.property name="event.topics" value="org/apache/sling/event/job"
+ * @scr.property name="event.topics" valueRef="EventUtil.TOPIC_JOB"
  * @scr.property name="repository.path" value="/sling/jobs"
  */
 public class JobEventHandler
     extends AbstractRepositoryEventHandler
     implements EventUtil.JobStatusNotifier, JobStatusProvider {
 
-    /** @scr.property value="20" type="Long" */
-    protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
+    /** A map for keeping track of currently processed job topics. */
+    protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+
+    /** Default sleep time. */
+    protected static final long DEFAULT_SLEEP_TIME = 20;
 
-    /** A flag indicating if this handler is currently processing a job. */
-    protected boolean isProcessing = false;
+    /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
+    protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
 
     /** We check every 20 secs by default. */
     protected long sleepTime;
@@ -72,69 +78,120 @@
      */
     protected void activate(final ComponentContext context)
     throws RepositoryException {
-        this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) * 1000;
-
+        if ( context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) != null ) {
+            this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) * 1000;
+        } else {
+            this.sleepTime = DEFAULT_SLEEP_TIME;
+        }
         super.activate(context);
     }
 
     /**
-     * Clean up the repository.
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
      */
-    protected void cleanUpRepository() {
-        this.logger.debug("Cleaning up repository, removing all jobs older than {} minutes.", this.cleanupPeriod);
+    protected void processWriteQueue() {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            EventInfo info = null;
+            try {
+                info = this.writeQueue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( info != null && this.running ) {
+                final Event event = info.event;
+                final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
 
-        // we create an own session for concurrency issues
-        Session s = null;
-        try {
-            s = this.createSession();
-            final Node parentNode = (Node)s.getItem(this.repositoryPath);
-            final Calendar deleteBefore = Calendar.getInstance();
-            deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
-            final String dateString = ISO8601.format(deleteBefore);
-            new Locked() {
-
-                protected Object run(Node node) throws RepositoryException {
-                    final QueryManager qManager = node.getSession().getWorkspace().getQueryManager();
-                    final StringBuffer buffer = new StringBuffer("/jcr:root");
-                    buffer.append(JobEventHandler.this.repositoryPath);
-                    buffer.append("//element(*, ");
-                    buffer.append(JobEventHandler.this.getEventNodeType());
-                    buffer.append(") [@");
-                    buffer.append(EventHelper.NODE_PROPERTY_ACTIVE);
-                    buffer.append(" = 'false' and @");
-                    buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
-                    buffer.append(" < xs:dateTime('");
-                    buffer.append(dateString);
-                    buffer.append("')]");
-
-                    logger.debug("Executing query {}", buffer);
-                    final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
-                    final NodeIterator iter = q.execute().getNodes();
-                    int count = 0;
-                    while ( iter.hasNext() ) {
-                        final Node eventNode = iter.nextNode();
-                        eventNode.remove();
-                        count++;
+                // if the job has no job id, we can just write the job to the repo and don't
+                // need locking
+                if ( jobId == null ) {
+                    try {
+                        final Node eventNode = this.writeEvent(event);
+                        info.nodePath = eventNode.getPath();
+                    } catch (RepositoryException re ) {
+                        // something went wrong, so let's log it
+                        this.logger.error("Exception during writing new job to repository.", re);
                     }
-                    parentNode.save();
-                    logger.debug("Removed {} job nodes from the repository.", count);
-                    return null;
-                }
-            }.with(parentNode, false);
-        } catch (RepositoryException e) {
-            // in the case of an error, we just log this as a warning
-            this.logger.warn("Exception during repository cleanup.", e);
-        } catch (InterruptedException e) {
-            // we ignore this
-            this.ignoreException(e);
-        } finally {
-            if ( s != null ) {
-                s.logout();
+                } else {
+                    try {
+                        // let's first search for an existing node with the same id
+                        final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
+                        final String nodeName = this.getNodeName(event);
+                        Node foundNode = null;
+                        if ( parentNode.hasNode(nodeName) ) {
+                            foundNode = parentNode.getNode(nodeName);
+                        }
+                        if ( foundNode != null ) {
+                            // if the node is locked, someone else was quicker
+                            // and we don't have to process this job
+                            if ( foundNode.isLocked() ) {
+                                foundNode = null;
+                            } else {
+                                // node is already in repository, so we just overwrite it
+                                try {
+                                    foundNode.remove();
+                                    parentNode.save();
+                                } catch (RepositoryException re) {
+                                    // if anything goes wrong, it means that (hopefully) someone
+                                    // else is processing this node
+                                    foundNode = null;
+                                }
+
+                            }
+                        }
+                        if ( foundNode == null ) {
+                            // We now write the event into the repository
+                            try {
+                                final Node eventNode = this.writeEvent(event);
+                                info.nodePath = eventNode.getPath();
+                            } catch (ItemExistsException iee) {
+                                // someone else did already write this node in the meantime
+                                // nothing to do for us
+                            }
+                        }
+                    } catch (RepositoryException re ) {
+                        // something went wrong, so let's log it
+                        this.logger.error("Exception during writing new job to repository.", re);
+                    }
+                }
+                // if we were able to write the event into the repository
+                // we will queue it for processing
+                if ( info.nodePath != null ) {
+                    try {
+                        this.queue.put(info);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        this.ignoreException(e);
+                    }
+                }
             }
         }
     }
 
     /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getCleanUpQueryString()
+     */
+    protected String getCleanUpQueryString() {
+        final Calendar deleteBefore = Calendar.getInstance();
+        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+        final String dateString = ISO8601.format(deleteBefore);
+        final StringBuffer buffer = new StringBuffer("/jcr:root");
+        buffer.append(JobEventHandler.this.repositoryPath);
+        buffer.append("//element(*, ");
+        buffer.append(JobEventHandler.this.getEventNodeType());
+        buffer.append(") [@");
+        buffer.append(EventHelper.NODE_PROPERTY_ACTIVE);
+        buffer.append(" = 'false' and @");
+        buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
+        buffer.append(" < xs:dateTime('");
+        buffer.append(dateString);
+        buffer.append("')]");
+
+        return buffer.toString();
+    }
+
+    /**
      * This method runs in the background and processes the local queue.
      */
     protected void runInBackground() {
@@ -147,76 +204,94 @@
                 // we ignore this
                 this.ignoreException(e);
             }
+
             if ( info != null && this.running ) {
-                if ( info.nodePath == null ) {
-                    this.processEvent(info.event);
-                } else {
-                    boolean process = false;
-                    synchronized ( this ) {
-                        if ( !this.isProcessing ) {
-                            this.isProcessing = true;
+                final Event event = info.event;
+                final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+                // check how we can process this job
+                // if parallel processing is allowed, we can just process
+                // if not we should check if any other job with the same topic is currently running
+                boolean process = parallelProcessing;
+                if ( !process ) {
+                    synchronized ( this.processingMap ) {
+                        final Boolean value = this.processingMap.get(jobTopic);
+                        if ( value == null || !value.booleanValue() ) {
+                            this.processingMap.put(jobTopic, Boolean.TRUE);
                             process = true;
                         }
                     }
-                    if ( process ) {
-                        boolean unlock = true;
-                        try {
-                            this.session.refresh(true);
-                            final Node eventNode = (Node) this.session.getItem(info.nodePath);
-                            if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
-                                // lock node
-                                Lock lock = null;
-                                try {
-                                    lock = eventNode.lock(false, true);
-                                } catch (RepositoryException re) {
-                                    // lock failed which means that the node is locked by someone else, so we don't have to requeue
-                                    process = false;
-                                }
-                                if ( process ) {
-                                    // check if event is still active
-                                    eventNode.refresh(true);
-                                    if ( eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
-                                        unlock = false;
-                                        this.processJob(info.event, eventNode, lock.getLockToken());
-                                    } else {
-                                        eventNode.unlock();
-                                    }
-                                }
+
+                }
+                if ( process ) {
+                    boolean unlock = true;
+                    Session session = null;
+                    try {
+                        session = this.createSession();
+                        final Node eventNode = (Node) session.getItem(info.nodePath);
+                        if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+                            // lock node
+                            Lock lock = null;
+                            try {
+                                lock = eventNode.lock(false, true);
+                            } catch (RepositoryException re) {
+                                // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                                process = false;
                             }
-                        } catch (RepositoryException e) {
-                            // ignore
-                            this.ignoreException(e);
-                        } finally {
-                            if ( unlock ) {
-                                synchronized ( this ) {
-                                    this.isProcessing = false;
+                            if ( process ) {
+                                // check if event is still active
+                                eventNode.refresh(true);
+                                if ( eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+                                    unlock = false;
+                                    this.processJob(info.event, eventNode, lock.getLockToken());
+                                } else {
+                                    eventNode.unlock();
                                 }
                             }
                         }
-                    } else {
-                        try {
-                            // check if the node is in processing or already finished
-                            final Node eventNode = (Node) this.session.getItem(info.nodePath);
-                            if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+                    } catch (RepositoryException e) {
+                        // ignore
+                        this.ignoreException(e);
+                    } finally {
+                        if ( unlock && !parallelProcessing ) {
+                            synchronized ( this.processingMap ) {
+                                this.processingMap.put(jobTopic, Boolean.FALSE);
+                            }
+                        }
+                        if ( session != null ) {
+                            session.logout();
+                        }
+                    }
+                } else {
+                    Session session = null;
+                    try {
+                        session = this.createSession();
+                        // check if the node is in processing or already finished
+                        final Node eventNode = (Node) session.getItem(info.nodePath);
+                        if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+                            try {
+                                this.queue.put(info);
+                            } catch (InterruptedException e) {
+                                // ignore
+                                this.ignoreException(e);
+                            }
+                            // wait time before we restart the cycle, if there is only one job in the queue!
+                            if ( this.queue.size() == 1 ) {
                                 try {
-                                    this.queue.put(info);
+                                    Thread.sleep(this.sleepTime);
                                 } catch (InterruptedException e) {
                                     // ignore
                                     this.ignoreException(e);
                                 }
-                                // wait time before we restart the cycle, if there is only one job in the queue!
-                                if ( this.queue.size() == 1 ) {
-                                    try {
-                                        Thread.sleep(this.sleepTime);
-                                    } catch (InterruptedException e) {
-                                        // ignore
-                                        this.ignoreException(e);
-                                    }
-                                }
                             }
-                        } catch (RepositoryException e) {
-                            // ignore
-                            this.ignoreException(e);
+                        }
+                    } catch (RepositoryException e) {
+                        // ignore
+                        this.ignoreException(e);
+                    } finally {
+                        if ( session != null ) {
+                            session.logout();
                         }
                     }
                 }
@@ -229,12 +304,18 @@
      * for new events created on other nodes.
      * @throws RepositoryException
      */
-    protected void startSession() throws RepositoryException {
-        super.startSession();
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
         // load unprocessed jobs from repository
         this.loadJobs();
-        this.session.getWorkspace().getObservationManager()
-            .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED, this.repositoryPath, true, null, null, true);
+        this.writerSession.getWorkspace().getObservationManager()
+            .addEventListener(this,
+                              javax.jcr.observation.Event.PROPERTY_CHANGED | javax.jcr.observation.Event.PROPERTY_REMOVED,
+                              this.repositoryPath,
+                              true,
+                              null,
+                              new String[] {this.getEventNodeType()},
+                              true);
     }
 
     /**
@@ -265,7 +346,7 @@
                 final EventInfo info = new EventInfo();
                 info.event = event;
                 try {
-                    this.queue.put(info);
+                    this.writeQueue.put(info);
                 } catch (InterruptedException e) {
                     // this should never happen
                     this.ignoreException(e);
@@ -276,102 +357,16 @@
         }
     }
 
-    protected void processEvent(final Event event) {
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getNodeName(org.osgi.service.event.Event)
+     */
+    protected String getNodeName(Event event) {
         final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
-        final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-        Lock lock = null;
-        // if the job has no job id, we can just write the job to the repo and don't
-        // need locking
-        if ( jobId == null ) {
-            try {
-                final Node eventNode = JobEventHandler.this.writeEvent(event);
-                lock = eventNode.lock(false, true);
-            } catch (RepositoryException re ) {
-                // something went wrong, so let's log it
-                this.logger.error("Exception during writing new job to repository.", re);
-            }
-        } else {
-            // we lock the parent node to ensure that noone else tries to add the same job to the repository
-            // while we are doing it
-            try {
-                final Node parentNode = (Node)this.session.getItem(this.repositoryPath);
-                lock = (Lock) new Locked() {
-
-                    protected Object run(Node node) throws RepositoryException {
-                        // if there is a node, we know that there is exactly one node
-                        final Node foundNode = JobEventHandler.this.queryJob(jobTopic, jobId);
-                        boolean writeAndSend =false;
-                        // if node is not present, we'll write it, lock it and send the event
-                        if ( foundNode == null ) {
-                            writeAndSend = true;
-                        } else {
-                            // node is already in repository, let's check the application id
-                            if ( foundNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString().equals(JobEventHandler.this.applicationId) ) {
-                                // delete old node (deleting is easier than updating...)
-                                foundNode.remove();
-                                parentNode.save();
-                                writeAndSend = true;
-                            }
-                        }
-                        if ( writeAndSend ) {
-                            final Node eventNode = JobEventHandler.this.writeEvent(event);
-                            return eventNode.lock(false, true);
-                        }
-                        return null;
-                    }
-                }.with(parentNode, false);
-            } catch (RepositoryException re ) {
-                // something went wrong, so let's log it
-                this.logger.error("Exception during writing new job to repository.", re);
-            } catch (InterruptedException e) {
-                // This should never happen from the lock, so we ignore it
-                this.ignoreException(e);
-            }
+        if ( jobId != null ) {
+            return jobId.replace('/', '.');
         }
 
-        // if we have a lock, we will try to fire the job
-        if ( lock != null ) {
-            final Node eventNode = lock.getNode();
-            final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
-
-            if ( parallelProcessing ) {
-                // if the job can be run in parallel, we'll just run it
-                this.processJob(event, eventNode, lock.getLockToken());
-            } else {
-                // we need to serialize the jobs
-                // lets check if we are currently processing a job
-                boolean process = false;
-                synchronized ( this ) {
-                    if ( !this.isProcessing ) {
-                        this.isProcessing = true;
-                        process = true;
-                    }
-                }
-                if ( process ) {
-                    this.processJob(event, eventNode, lock.getLockToken());
-                } else {
-                    // we don't process the job right now, so unlock and put in local queue
-                    final EventInfo info = new EventInfo();
-                    info.event = event;
-                    try {
-                        info.nodePath = eventNode.getPath();
-                        this.queue.put(info);
-                    } catch (RepositoryException e) {
-                        // getPath() should work at this stage, so we ignore it
-                        this.ignoreException(e);
-                    } catch (InterruptedException e) {
-                        // this should never happen so we ignore it
-                        this.ignoreException(e);
-                    }
-                    try {
-                        eventNode.unlock();
-                    } catch (RepositoryException e) {
-                        // if unlocking fails, we will just ignore it
-                        this.ignoreException(e);
-                    }
-                }
-            }
-        }
+        return "Job " + UUID.randomUUID().toString();
     }
 
     /**
@@ -381,6 +376,7 @@
      */
     protected void processJob(Event event, Node eventNode, String lockToken)  {
         final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+        final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         boolean unlock = true;
         try {
             final Event jobEvent = this.getJobEvent(event, eventNode, lockToken);
@@ -400,8 +396,8 @@
         } finally {
             if ( unlock ) {
                 if ( !parallelProcessing ) {
-                    synchronized ( this ) {
-                        this.isProcessing = false;
+                    synchronized ( this.processingMap ) {
+                        this.processingMap.put(jobTopic, Boolean.FALSE);
                     }
                 }
                 // unlock node
@@ -457,20 +453,29 @@
             s = this.createSession();
             while ( iter.hasNext() ) {
                 final javax.jcr.observation.Event event = iter.nextEvent();
-                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED ) {
+                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+                   || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
                     try {
-                        final Node eventNode = (Node) s.getItem(event.getPath());
-                        if ( !eventNode.isLocked()
-                             && eventNode.hasProperty(EventHelper.NODE_PROPERTY_ACTIVE)
-                             && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
-                            final EventInfo info = new EventInfo();
-                            info.event = this.readEvent(eventNode);
-                            info.nodePath = event.getPath();
-                            try {
-                                this.queue.put(info);
-                            } catch (InterruptedException e) {
-                                // we ignore this exception as this should never occur
-                                this.ignoreException(e);
+                        final String propPath = event.getPath();
+                        int pos = propPath.lastIndexOf('/');
+                        final String nodePath = propPath.substring(0, pos);
+                        final String propertyName = propPath.substring(pos+1);
+
+                        // we are only interested in unlocks
+                        if ( "jcr:lockOwner".equals(propertyName) ) {
+                            final Node eventNode = (Node) s.getItem(nodePath);
+                            if ( !eventNode.isLocked()
+                                 && eventNode.hasProperty(EventHelper.NODE_PROPERTY_ACTIVE)
+                                 && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+                                final EventInfo info = new EventInfo();
+                                info.event = this.readEvent(eventNode);
+                                info.nodePath = event.getPath();
+                                try {
+                                    this.queue.put(info);
+                                } catch (InterruptedException e) {
+                                    // we ignore this exception as this should never occur
+                                    this.ignoreException(e);
+                                }
                             }
                         }
                     } catch (RepositoryException re) {
@@ -488,42 +493,11 @@
     }
 
     /**
-     * Search for a node with the corresponding topic and unique key.
-     * @param topic
-     * @param key
-     * @return The node or null.
-     * @throws RepositoryException
-     */
-    protected Node queryJob(String topic, String key) throws RepositoryException {
-        final QueryManager qManager = this.session.getWorkspace().getQueryManager();
-        final StringBuffer buffer = new StringBuffer("/jcr:root");
-        buffer.append(this.repositoryPath);
-        buffer.append("//element(*, ");
-        buffer.append(this.getEventNodeType());
-        buffer.append(") [");
-        buffer.append(EventHelper.NODE_PROPERTY_TOPIC);
-        buffer.append(" = '");
-        buffer.append(topic);
-        buffer.append("' and ");
-        buffer.append(EventHelper.NODE_PROPERTY_JOBID);
-        buffer.append(" = '");
-        buffer.append(key);
-        buffer.append("']");
-        final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
-        final NodeIterator result = q.execute().getNodes();
-        Node foundNode = null;
-        if ( result.hasNext() ) {
-            foundNode = result.nextNode();
-        }
-        return foundNode;
-    }
-
-    /**
      * Load all active jobs from the repository.
      * @throws RepositoryException
      */
     protected void loadJobs() throws RepositoryException {
-        final QueryManager qManager = this.session.getWorkspace().getQueryManager();
+        final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
         final StringBuffer buffer = new StringBuffer("/jcr:root");
         buffer.append(this.repositoryPath);
         buffer.append("//element(*, ");
@@ -553,13 +527,38 @@
     /**
      * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, String, boolean)
      */
-    public void finishedJob(Event job, String eventNodePath, String lockToken, boolean reschedule) {
+    public boolean finishedJob(Event job, String eventNodePath, String lockToken, boolean shouldReschedule) {
+        boolean reschedule = shouldReschedule;
+        if ( shouldReschedule ) {
+            // check if we exceeded the number of retries
+            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
+                int retries = (Integer) job.getProperty(EventUtil.PROPERTY_JOB_RETRIES);
+                int retryCount = 0;
+                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                    retryCount = (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+                }
+                retryCount++;
+                if ( retryCount >= retries ) {
+                    reschedule = false;
+                }
+                // update event with retry count
+                final Dictionary<String, Object> newProperties;
+                // create a new dictionary
+                newProperties = new Hashtable<String, Object>();
+                final String[] names = job.getPropertyNames();
+                for(int i=0; i<names.length; i++ ) {
+                    newProperties.put(names[i], job.getProperty(names[i]));
+                }
+                newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+                job = new Event(job.getTopic(), newProperties);
+            }
+        }
         final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
         Session s = null;
         try {
             s = this.createSession();
             // remove lock token from shared session and add it to current session
-            this.session.removeLockToken(lockToken);
+            this.writerSession.removeLockToken(lockToken);
             s.addLockToken(lockToken);
             final Node eventNode = (Node) s.getItem(eventNodePath);
             try {
@@ -573,8 +572,9 @@
                 this.logger.error("Exception during job finishing.", re);
             } finally {
                 if ( !parallelProcessing) {
-                    synchronized ( this ) {
-                        this.isProcessing = false;
+                    final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                    synchronized ( this.processingMap ) {
+                        this.processingMap.put(jobTopic, Boolean.FALSE);
                     }
                 }
                 // unlock node
@@ -590,17 +590,47 @@
                 try {
                     info.event = job;
                     info.nodePath = eventNode.getPath();
-                    this.queue.put(info);
-                } catch (InterruptedException e) {
-                    // this should never happen
-                    this.ignoreException(e);
                 } catch (RepositoryException e) {
                     // this should never happen
                     this.ignoreException(e);
                 }
+                // delay rescheduling?
+                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                    final Thread t = new Thread() {
+                        public void run() {
+                            try {
+                                Thread.sleep(delay);
+                            } catch (InterruptedException e) {
+                                // this should never happen
+                                ignoreException(e);
+                            }
+                            try {
+                                queue.put(info);
+                            } catch (InterruptedException e) {
+                                // this should never happen
+                                ignoreException(e);
+                            }
+                        }
+                    };
+                    t.start();
+                } else {
+                    // put directly into queue
+                    try {
+                        this.queue.put(info);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        this.ignoreException(e);
+                    }
+                }
+            }
+            if ( !shouldReschedule ) {
+                return true;
             }
+            return reschedule;
         } catch (RepositoryException re) {
             this.logger.error("Unable to create new session.", re);
+            return false;
         } finally {
             if ( s != null ) {
                 s.logout();

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java Mon Feb 11 07:19:29 2008
@@ -67,22 +67,28 @@
     protected Scheduler scheduler;
 
     /**
-     * Start the repository session and add this handler as an observer
-     * for new events created on other nodes.
-     * @throws RepositoryException
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
      */
-    protected void startSession() throws RepositoryException {
-        super.startSession();
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
         // load timed events from repository
         this.loadEvents();
-        this.session.getWorkspace().getObservationManager()
+        this.writerSession.getWorkspace().getObservationManager()
             .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED, this.repositoryPath, true, null, null, true);
     }
 
     /**
-     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#cleanUpRepository()
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getCleanUpQueryString()
+     */
+    protected String getCleanUpQueryString() {
+        // nothing to clean up for now
+        return null;
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
      */
-    protected void cleanUpRepository() {
+    protected void processWriteQueue() {
         // nothing to do right now
     }
 
@@ -122,8 +128,8 @@
                                 }
                             }
                         } else {
-                            this.session.refresh(true);
-                            final Node eventNode = (Node) this.session.getItem(info.nodePath);
+                            this.writerSession.refresh(true);
+                            final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
                             if ( !eventNode.isLocked() ) {
                                 // lock node
                                 Lock lock = null;
@@ -156,13 +162,13 @@
 
     protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
         try {
-            final Node parentNode = (Node)this.session.getItem(this.repositoryPath);
+            final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
             Lock lock = (Lock) new Locked() {
 
                 protected Object run(Node node) throws RepositoryException {
                     final String jobId = scheduleInfo.jobId;
                     // if there is a node, we know that there is exactly one node
-                    final Node foundNode = queryJob(session, jobId);
+                    final Node foundNode = queryJob(writerSession, jobId);
                     if ( scheduleInfo.isStopEvent() ) {
                         // if this is a stop event, we should remove the node from the repository
                         // if there is no node someone else was faster and we can ignore this
@@ -408,7 +414,7 @@
      * @throws RepositoryException
      */
     protected void loadEvents() throws RepositoryException {
-        final QueryManager qManager = this.session.getWorkspace().getQueryManager();
+        final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
         final StringBuffer buffer = new StringBuffer("/jcr:root");
         buffer.append(this.repositoryPath);
         buffer.append("//element(*, ");

Modified: incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Mon Feb 11 07:19:29 2008
@@ -126,8 +126,8 @@
         assertEquals(this.handler.applicationId, SLING_ID);
         assertEquals(this.handler.cleanupPeriod, CLEANUP_PERIOD);
         assertEquals(this.handler.repositoryPath, REPO_PATH);
-        assertNotNull(this.handler.session);
-        final EventListenerIterator iter = this.handler.session.getWorkspace().getObservationManager().getRegisteredEventListeners();
+        assertNotNull(this.handler.writerSession);
+        final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
         boolean found = false;
         while ( !found && iter.hasNext() ) {
             final javax.jcr.observation.EventListener listener = iter.nextEventListener();