You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/19 17:14:26 UTC

svn commit: r911855 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/job/ src/test/java/org/apache/sling/event/impl/

Author: cziegeler
Date: Fri Feb 19 16:14:26 2010
New Revision: 911855

URL: http://svn.apache.org/viewvc?rev=911855&view=rev
Log:
SLING-1389 : Background loading might not load all stored jobs
SLING-1390 : Deadlock in rare circumstances
SLING-1396 : Include only used classes from jcr commons
This is mostly a refactoring affort to reduce the complexity and make the code more understandable. With this I could fix the above mentioned errors.
Started to add some basic test cases.

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Fri Feb 19 16:14:26 2010
@@ -54,6 +54,11 @@
                 <extensions>true</extensions>
                 <configuration>
                     <instructions>
+                        <!-- As we're using JCR 2.0 for testing we explicitly have to
+                             import version 1.0 here  -->
+                        <Import-Package>
+                            javax.jcr.*;version=1.0,*
+                        </Import-Package>
                         <Export-Package>
                             org.apache.sling.event;version=2.2.0
                         </Export-Package>
@@ -68,7 +73,7 @@
                             slingevent=http://sling.apache.org/jcr/event/1.0
                         </Sling-Namespaces>
                         <Embed-Dependency>
-                            jackrabbit-jcr-commons
+                            jackrabbit-jcr-commons;inline="org/apache/jackrabbit/util/ISO9075.*|org/apache/jackrabbit/name/ISO8601.*|org/apache/jackrabbit/util/XMLChar.*"
                         </Embed-Dependency>
                     </instructions>
                 </configuration>
@@ -103,9 +108,12 @@
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.compendium</artifactId>
         </dependency>
+        <!-- We use version 2.0 for testing! -->
         <dependency>
             <groupId>javax.jcr</groupId>
             <artifactId>jcr</artifactId>
+            <version>2.0</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
@@ -156,7 +164,7 @@
         <dependency>
             <groupId>org.apache.jackrabbit</groupId>
             <artifactId>jackrabbit-jcr-commons</artifactId>
-            <version>1.4.2</version>
+            <version>2.0.0</version>
             <scope>provided</scope>
         </dependency>
         <!-- Testing -->
@@ -169,9 +177,13 @@
             <artifactId>jmock-junit4</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.sling</groupId>
-            <artifactId>org.apache.sling.commons.testing</artifactId>
-            <version>2.0.4-incubator</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-core</artifactId>
+            <version>2.0.0</version>
             <scope>test</scope>
         </dependency>
     </dependencies>

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Fri Feb 19 16:14:26 2010
@@ -30,7 +30,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 
 import javax.jcr.Item;
@@ -57,6 +56,7 @@
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
 import org.apache.sling.event.impl.job.JobBlockingQueue;
+import org.apache.sling.event.impl.job.JobUtil;
 import org.apache.sling.event.impl.job.ParallelInfo;
 import org.osgi.framework.Constants;
 import org.osgi.service.component.ComponentConstants;
@@ -145,7 +145,7 @@
     private long maximumParallelJobs;
 
     /** Background session. */
-    private Session backgroundSession;
+    protected Session backgroundSession;
 
     /** Unloaded jobs. */
     private Set<String>unloadedJobs = new HashSet<String>();
@@ -281,12 +281,14 @@
                 } catch (InterruptedException e) {
                     this.ignoreException(e);
                 }
-                this.logger.debug("Stopped job queue {}", jbq.getName());
+                this.logger.info("Stopped job queue {}", jbq.getName());
             }
         }
         if ( this.backgroundSession != null ) {
             synchronized ( this.backgroundLock ) {
                 this.logger.debug("Shutting down background session.");
+                // notify possibly sleeping thread
+                this.backgroundLock.notify();
                 try {
                     this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
                 } catch (RepositoryException e) {
@@ -474,7 +476,7 @@
                 info.event = event;
                 final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
                 final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-                final String nodePath = this.getNodePath(jobTopic, jobId);
+                final String nodePath = JobUtil.getUniquePath(jobTopic, jobId);
 
                 // if the job has no job id, we can just write the job to the repo and don't
                 // need locking
@@ -537,14 +539,88 @@
                 // if we were able to write the event into the repository
                 // we will queue it for processing
                 if ( info.nodePath != null ) {
-                    try {
-                        this.queue.put(info);
-                    } catch (InterruptedException e) {
-                        // this should never happen
-                        this.ignoreException(e);
+                    this.queueJob(info);
+                }
+            }
+        }
+    }
+
+    /**
+     * Put the job into the correct queue.
+     */
+    private void queueJob(final EventInfo info) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Received new job {}", EventUtil.toString(info.event));
+        }
+        // check for local only jobs and remove them from the queue if they're meant
+        // for another application node
+        final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
+        if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
+            && appId != null && !this.applicationId.equals(appId) ) {
+            if ( logger.isDebugEnabled() ) {
+                 logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(info.event));
+            }
+        } else {
+
+            // check if we should put this into a separate queue
+            boolean queued = false;
+            if ( info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
+                synchronized ( this.jobQueues ) {
+                    BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
+                    if ( jobQueue == null ) {
+                        // check if we have exceeded the maximum number of job queues
+                        if ( this.jobQueues.size() >= this.maxJobQueues ) {
+                            this.logger.warn("Unable to create new job queue named {} as there are already {} job queues." +
+                                    " Try to increase the maximum number of job queues!", queueName, this.jobQueues.size());
+                        } else {
+                            final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+                            final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
+                            jobQueue = jq;
+                            this.jobQueues.put(queueName, jq);
+                            // Start background thread
+                            this.threadPool.execute(new Runnable() {
+
+                                /**
+                                 * @see java.lang.Runnable#run()
+                                 */
+                                public void run() {
+                                    while ( running && !jq.isFinished() ) {
+                                        logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
+                                        try {
+                                            runJobQueue(queueName, jq);
+                                        } catch (Throwable t) {
+                                            logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+                                        }
+                                    }
+                                }
+
+                            });
+                        }
+                    }
+                    if ( jobQueue != null ) {
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
+                        }
+                        try {
+                            jobQueue.put(info);
+                        } catch (InterruptedException e) {
+                            // this should never happen
+                            this.ignoreException(e);
+                        }
+                        // don't put into main queue
+                        queued = true;
                     }
                 }
             }
+            if ( !queued ) {
+                try {
+                    this.queue.put(info);
+                } catch (InterruptedException e) {
+                    // this should never happen
+                    this.ignoreException(e);
+                }
+            }
         }
     }
 
@@ -552,6 +628,7 @@
      * This method runs in the background and processes the local queue.
      */
     protected void runInBackground() throws RepositoryException {
+        // create the background session and register a listener
         this.backgroundSession = this.createSession();
         this.backgroundSession.getWorkspace().getObservationManager()
                 .addEventListener(this,
@@ -562,7 +639,6 @@
                                   null,
                                   new String[] {this.getEventNodeType()},
                                   true);
-        // load unprocessed jobs from repository
         if ( this.running ) {
             logger.info("Apache Sling Job Event Handler started.");
             logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}," +
@@ -578,6 +654,7 @@
                 ctx.disableComponent(name);
             }
         }
+        // This is the main queue
         while ( this.running ) {
             // so let's wait/get the next job from the queue
             EventInfo info = null;
@@ -589,76 +666,8 @@
             }
 
             if ( info != null && this.running ) {
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug("Received new job {}", EventUtil.toString(info.event));
-                }
-                // check for local only jobs and remove them from the queue if they're meant
-                // for another application node
-                final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
-                if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
-                    && appId != null && !this.applicationId.equals(appId) ) {
-                    if ( logger.isDebugEnabled() ) {
-                         logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(info.event));
-                    }
-                    info = null;
-                }
-
-                // check if we should put this into a separate queue
-                if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
-                    final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
-                    synchronized ( this.jobQueues ) {
-                        BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
-                        if ( jobQueue == null ) {
-                            // check if we have exceeded the maximum number of job queues
-                            if ( this.jobQueues.size() >= this.maxJobQueues ) {
-                                this.logger.warn("Unable to create new job queue named {} as there are already {} job queues." +
-                                        " Try to increase the maximum number of job queues!", queueName, this.jobQueues.size());
-                            } else {
-                                final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
-                                final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
-                                jobQueue = jq;
-                                this.jobQueues.put(queueName, jq);
-                                // Start background thread
-                                this.threadPool.execute(new Runnable() {
-
-                                    /**
-                                     * @see java.lang.Runnable#run()
-                                     */
-                                    public void run() {
-                                        while ( running && !jq.isFinished() ) {
-                                            logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
-                                            try {
-                                                runJobQueue(queueName, jq);
-                                            } catch (Throwable t) {
-                                                logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
-                                            }
-                                        }
-                                    }
-
-                                });
-                            }
-                        }
-                        if ( jobQueue != null ) {
-                            if ( logger.isDebugEnabled() ) {
-                                logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
-                            }
-                            try {
-                                jobQueue.put(info);
-                            } catch (InterruptedException e) {
-                                // this should never happen
-                                this.ignoreException(e);
-                            }
-                            // don't process this here
-                            info = null;
-                        }
-                    }
-                }
-
-                // if we still have a job, process it
-                if ( info != null ) {
-                    if ( this.executeJob(info, null) == Status.RESCHEDULE ) {
-                        this.putBackIntoMainQueue(info, true);
-                    }
+                if ( this.executeJob(info, null) == Status.RESCHEDULE ) {
+                    this.putBackIntoMainQueue(info, true);
                 }
             }
         }
@@ -719,6 +728,61 @@
         }
     }
 
+    /**
+     * Check the precondition for the job.
+     * This method handles the parallel settings and returns <code>true</code>
+     * if the job can be run. If <code>false</code> is returned, we have to
+     * wait for another job to finish first.
+     */
+    private boolean checkPrecondition(final ParallelInfo parInfo, final String jobTopic) {
+        // check how we can process this job:
+        // if the job should not be processed in parallel, we have to check
+        //     if another job with the same topic is currently running
+        // if parallel processing is allowed, we have to check for the number
+        //     of max allowed parallel jobs for this topic
+        boolean process = parInfo.processParallel;
+        if ( !parInfo.processParallel ) {
+            synchronized ( this.processingMap ) {
+                final Boolean value = this.processingMap.get(jobTopic);
+                if ( value == null || !value.booleanValue() ) {
+                    this.processingMap.put(jobTopic, Boolean.TRUE);
+                    process = true;
+                }
+            }
+        } else {
+            if ( parInfo.maxParallelJob > 1 ) {
+                synchronized ( this.parallelProcessingMap ) {
+                    final Integer value = this.parallelProcessingMap.get(jobTopic);
+                    final int currentValue = (value == null ? 0 : value.intValue());
+                    if ( currentValue < parInfo.maxParallelJob ) {
+                        this.parallelProcessingMap.put(jobTopic, currentValue + 1);
+                    } else {
+                        process = false;
+                    }
+                }
+            }
+        }
+        return process;
+    }
+
+    /**
+     * Unlock the parallel job processing state.
+     */
+    private void unlockState(final ParallelInfo parInfo, final String jobTopic) {
+        if ( !parInfo.processParallel ) {
+            synchronized ( this.processingMap ) {
+                this.processingMap.put(jobTopic, Boolean.FALSE);
+            }
+        } else {
+            if ( parInfo.maxParallelJob > 1 ) {
+                synchronized ( this.parallelProcessingMap ) {
+                    final Integer value = this.parallelProcessingMap.get(jobTopic);
+                    this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
+                }
+            }
+        }
+    }
+
     public enum Status {
         FAILED,
         RESCHEDULE,
@@ -730,7 +794,6 @@
      */
     private Status executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
         boolean putback = false;
-        boolean wait = false;
         synchronized (this.backgroundLock) {
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Executing job {}.", EventUtil.toString(info.event));
@@ -739,45 +802,30 @@
                 this.backgroundSession.refresh(false);
                 // check if the node still exists
                 if ( this.backgroundSession.itemExists(info.nodePath)
-                     && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
+                     && !this.backgroundSession.itemExists(info.nodePath + '/' + EventHelper.NODE_PROPERTY_FINISHED)) {
                     final Event event = info.event;
                     final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
                     final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
 
-                    // check how we can process this job:
-                    // if the job should not be processed in parallel, we have to check
-                    //     if another job with the same topic is currently running
-                    // if parallel processing is allowed, we have to check for the number
-                    //     of max allowed parallel jobs for this topic
-                    boolean process = parInfo.processParallel;
-                    if ( !parInfo.processParallel ) {
-                        synchronized ( this.processingMap ) {
-                            final Boolean value = this.processingMap.get(jobTopic);
-                            if ( value == null || !value.booleanValue() ) {
-                                this.processingMap.put(jobTopic, Boolean.TRUE);
-                                process = true;
-                            }
-                        }
-                    } else {
-                        if ( parInfo.maxParallelJob > 1 ) {
-                            synchronized ( this.parallelProcessingMap ) {
-                                final Integer value = this.parallelProcessingMap.get(jobTopic);
-                                final int currentValue = (value == null ? 0 : value.intValue());
-                                if ( currentValue < parInfo.maxParallelJob ) {
-                                    this.parallelProcessingMap.put(jobTopic, currentValue + 1);
-                                } else {
-                                    process = false;
-                                }
-                            }
-                        }
-                    }
+                    final boolean process = checkPrecondition(parInfo, jobTopic);
                     // check number of parallel jobs for main queue
                     if ( process && jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) {
                         if ( logger.isDebugEnabled() ) {
-                            logger.debug("Rescheduling job {} - maximum parallel job count of {} reached!", EventUtil.toString(info.event), this.maximumParallelJobs);
+                            logger.debug("Waiting with executing job {} - maximum parallel job count of {} reached!",
+                                    EventUtil.toString(info.event), this.maximumParallelJobs);
+                        }
+                        try {
+                            this.backgroundLock.wait();
+                        } catch (InterruptedException e) {
+                            this.ignoreException(e);
+                        }
+                        // let's check if we're still running
+                        if ( !this.running ) {
+                            return Status.FAILED;
+                        }
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug("Continuing with executing job {}.", EventUtil.toString(info.event));
                         }
-                        process = false;
-                        wait = true;
                     }
                     if ( process ) {
                         boolean unlock = true;
@@ -789,31 +837,18 @@
                                     eventNode.lock(false, true);
                                 } catch (RepositoryException re) {
                                     // lock failed which means that the node is locked by someone else, so we don't have to requeue
-                                    process = false;
-                                }
-                                if ( process ) {
-                                    unlock = false;
-                                    this.processJob(info.event, eventNode, jobQueue == null);
-                                    return Status.SUCCESS;
+                                    return Status.FAILED;
                                 }
+                                unlock = false;
+                                this.processJob(info.event, eventNode, jobQueue == null, parInfo);
+                                return Status.SUCCESS;
                             }
                         } catch (RepositoryException e) {
                             // ignore
                             this.ignoreException(e);
                         } finally {
                             if ( unlock ) {
-                                if ( !parInfo.processParallel ) {
-                                    synchronized ( this.processingMap ) {
-                                        this.processingMap.put(jobTopic, Boolean.FALSE);
-                                    }
-                                } else {
-                                    if ( parInfo.maxParallelJob > 1 ) {
-                                        synchronized ( this.parallelProcessingMap ) {
-                                            final Integer value = this.parallelProcessingMap.get(jobTopic);
-                                            this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
-                                        }
-                                    }
-                                }
+                                unlockState(parInfo, jobTopic);
                             }
                         }
                     } else {
@@ -834,17 +869,6 @@
             }
 
         }
-        // if this is the main queue and we have reached the max number of parallel jobs
-        // we wait a little bit before continuing
-        if ( wait ) {
-            logger.debug("Sleeping for {} seconds as the maximum number of parallel threads is reached.", sleepTime);
-            try {
-                Thread.sleep(sleepTime * 1000);
-            } catch (InterruptedException ie) {
-                // ignore
-                ignoreException(ie);
-            }
-        }
         // if we have to put back the job, we return this status
         if ( putback ) {
             return Status.RESCHEDULE;
@@ -886,7 +910,7 @@
                         this.ignoreException(e);
                     }
                 } else {
-                    this.logger.warn("Event does not contain job topic: {}", event);
+                    this.logger.warn("Event does not contain job topic: {}", EventUtil.toString(event));
                 }
 
             } else {
@@ -912,22 +936,7 @@
                                         try {
                                             if ( s.itemExists(path) ) {
                                                 final Node eventNode = (Node) s.getItem(path);
-                                                if ( !eventNode.isLocked() ) {
-                                                    try {
-                                                        final EventInfo info = new EventInfo();
-                                                        info.event = readEvent(eventNode);
-                                                        info.nodePath = path;
-                                                        try {
-                                                            queue.put(info);
-                                                        } catch (InterruptedException e) {
-                                                            // we ignore this exception as this should never occur
-                                                            ignoreException(e);
-                                                        }
-                                                    } catch (ClassNotFoundException cnfe) {
-                                                        newUnloadedJobs.add(path);
-                                                        ignoreException(cnfe);
-                                                    }
-                                                }
+                                                tryToLoadJob(eventNode, newUnloadedJobs);
                                             }
                                         } catch (RepositoryException re) {
                                             // we ignore this and readd
@@ -956,46 +965,15 @@
     }
 
     /**
-     * Create a unique node path (folder and name) for the job.
-     */
-    private String getNodePath(final String jobTopic, final String jobId) {
-        final StringBuilder sb = new StringBuilder(jobTopic.replace('/', '.'));
-        sb.append('/');
-        if ( jobId != null ) {
-            // we create an md from the job id - we use the first 6 bytes to
-            // create sub directories
-            final String md5 = EventHelper.md5(jobId);
-            sb.append(md5.substring(0, 2));
-            sb.append('/');
-            sb.append(md5.substring(2, 4));
-            sb.append('/');
-            sb.append(md5.substring(4, 6));
-            sb.append('/');
-            sb.append(EventHelper.filter(jobId));
-        } else {
-            // create a path from the uuid - we use the first 6 bytes to
-            // create sub directories
-            final String uuid = UUID.randomUUID().toString();
-            sb.append(uuid.substring(0, 2));
-            sb.append('/');
-            sb.append(uuid.substring(2, 4));
-            sb.append('/');
-            sb.append(uuid.substring(5, 7));
-            sb.append("/Job_");
-            sb.append(uuid.substring(8, 17));
-        }
-        return sb.toString();
-    }
-
-    /**
      * Process a job and unlock the node in the repository.
      * @param event The original event.
      * @param eventNode The node in the repository where the job is stored.
      * @param isMainQueue Is this the main queue?
      */
-    private void processJob(Event event, Node eventNode, boolean isMainQueue)  {
-        final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
-        final boolean parallelProcessing = parInfo.processParallel;
+    private void processJob(final Event event,
+                            final Node eventNode,
+                            final boolean isMainQueue,
+                            final ParallelInfo parInfo)  {
         final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         if ( logger.isDebugEnabled() ) {
             logger.debug("Starting job {}", EventUtil.toString(event));
@@ -1034,18 +1012,7 @@
                 if ( isMainQueue ) {
                     this.parallelJobCount--;
                 }
-                if ( !parallelProcessing ) {
-                    synchronized ( this.processingMap ) {
-                        this.processingMap.put(jobTopic, Boolean.FALSE);
-                    }
-                } else {
-                    if ( parInfo.maxParallelJob > 1 ) {
-                        synchronized ( this.parallelProcessingMap ) {
-                            final Integer value = this.parallelProcessingMap.get(jobTopic);
-                            this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
-                        }
-                    }
-                }
+                this.unlockState(parInfo, jobTopic);
 
                 // unlock node
                 try {
@@ -1137,25 +1104,7 @@
                                     s = this.createSession();
                                 }
                                 final Node eventNode = (Node) s.getItem(nodePath);
-                                if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
-                                    try {
-                                        final EventInfo info = new EventInfo();
-                                        info.event = this.readEvent(eventNode);
-                                        info.nodePath = nodePath;
-                                        try {
-                                            this.queue.put(info);
-                                        } catch (InterruptedException e) {
-                                            // we ignore this exception as this should never occur
-                                            this.ignoreException(e);
-                                        }
-                                    } catch (ClassNotFoundException cnfe) {
-                                        // store path for lazy loading
-                                        synchronized ( this.unloadedJobs ) {
-                                            this.unloadedJobs.add(nodePath);
-                                        }
-                                        this.ignoreException(cnfe);
-                                    }
-                                }
+                                tryToLoadJob(eventNode, this.unloadedJobs);
                             }
                         }
                     } catch (RepositoryException re) {
@@ -1215,34 +1164,26 @@
                 long count = 0;
                 while ( result.hasNext() && count < maxLoad ) {
                     final Node eventNode = result.nextNode();
-                    if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                    eventCreated = eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+                    if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
                         count++;
-                        eventCreated = eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
-                        final String nodePath = eventNode.getPath();
-                        try {
-                            final Event event = this.readEvent(eventNode);
-                            final EventInfo info = new EventInfo();
-                            info.event = event;
-                            info.nodePath = nodePath;
-                            try {
-                                this.queue.put(info);
-                            } catch (InterruptedException e) {
-                                // we ignore this exception as this should never occur
-                                this.ignoreException(e);
-                            }
-                        } catch (ClassNotFoundException cnfe) {
-                            // store path for lazy loading
-                            synchronized ( this.unloadedJobs ) {
-                                this.unloadedJobs.add(nodePath);
-                            }
-                            this.ignoreException(cnfe);
-                        } catch (RepositoryException re) {
-                            this.logger.error("Unable to load stored job from " + nodePath, re);
+                    }
+                }
+                // now we have to add all jobs with the same created time!
+                boolean done = false;
+                while ( result.hasNext() && !done ) {
+                    final Node eventNode = result.nextNode();
+                    final long created = eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+                    if ( created == eventCreated ) {
+                        if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+                            count++;
                         }
+                    } else {
+                        done = true;
                     }
                 }
                 // have we processed all jobs?
-                if ( !result.hasNext() ) {
+                if ( !done && !result.hasNext() ) {
                     eventCreated = -1;
                 }
                 logger.debug("Loaded {} jobs and new since {}", count, eventCreated);
@@ -1253,6 +1194,33 @@
         return eventCreated;
     }
 
+    private boolean tryToLoadJob(final Node eventNode, final Set<String> unloadedJobSet) {
+        try {
+            if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                final String nodePath = eventNode.getPath();
+                try {
+                    final Event event = this.readEvent(eventNode);
+                    final EventInfo info = new EventInfo();
+                    info.event = event;
+                    info.nodePath = nodePath;
+                    this.queueJob(info);
+                } catch (ClassNotFoundException cnfe) {
+                    // store path for lazy loading
+                    synchronized ( unloadedJobSet ) {
+                        unloadedJobSet.add(nodePath);
+                    }
+                    this.ignoreException(cnfe);
+                } catch (RepositoryException re) {
+                    this.logger.error("Unable to load stored job from " + nodePath, re);
+                }
+                return true;
+            }
+        } catch (RepositoryException re) {
+            this.logger.error("Unable to load stored job from " + eventNode, re);
+        }
+        return false;
+    }
+
     /**
      * @see org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event, java.lang.String)
      */
@@ -1322,7 +1290,6 @@
             this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job);
         }
         final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
-        final boolean parallelProcessing = parInfo.processParallel;
         EventInfo putback = null;
         // we have to use the same session for unlocking that we used for locking!
         synchronized ( this.backgroundLock ) {
@@ -1370,20 +1337,11 @@
                     this.logger.error("Exception during job finishing.", re);
                 } finally {
                     final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-                    if ( !parallelProcessing) {
-                        synchronized ( this.processingMap ) {
-                            this.processingMap.put(jobTopic, Boolean.FALSE);
-                        }
-                    } else {
-                        if ( parInfo.maxParallelJob > 1 ) {
-                            synchronized ( this.parallelProcessingMap ) {
-                                final Integer value = this.parallelProcessingMap.get(jobTopic);
-                                this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
-                            }
-                        }
-                    }
+                    this.unlockState(parInfo, jobTopic);
+
                     if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
                         this.parallelJobCount--;
+                        this.backgroundLock.notify();
                     }
 
                     if ( unlock ) {
@@ -1422,19 +1380,7 @@
                     if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
                         checkForNotify(job, info);
                     } else {
-
-                        // delay rescheduling?
-                        if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
-                            putback = info;
-                        } else {
-                            // put directly into queue
-                            try {
-                                queue.put(info);
-                            } catch (InterruptedException e) {
-                                // this should never happen
-                                this.ignoreException(e);
-                            }
-                        }
+                        putback = info;
                     }
                 } else {
                     // if this is an own job queue, we simply signal the queue to continue
@@ -1456,17 +1402,6 @@
     }
 
     private void putBackIntoMainQueue(final EventInfo info, final boolean useSleepTime) {
-        if ( logger.isDebugEnabled() ) {
-            logger.debug("Putting job {} back into the queue.", EventUtil.toString(info.event));
-        }
-        final Date fireDate = new Date();
-        if ( useSleepTime ) {
-            fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
-        } else {
-            final long delay = (Long)info.event.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-            fireDate.setTime(System.currentTimeMillis() + delay);
-        }
-
         final Runnable t = new Runnable() {
             public void run() {
                 try {
@@ -1477,21 +1412,42 @@
                 }
             }
         };
-        try {
-            this.scheduler.fireJobAt(null, t, null, fireDate);
-        } catch (Exception e) {
-            // we ignore the exception and just put back the job in the queue
-            ignoreException(e);
-            if ( useSleepTime ) {
+
+        final long delay;
+        if ( useSleepTime ) {
+            delay = this.sleepTime * 1000;
+        } else {
+            final Long obj = (Long)info.event.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+            delay = (obj == null) ? -1 : obj.longValue();
+        }
+        if ( delay == -1 ) {
+            // put directly without waiting
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Putting job {} back into the queue.", EventUtil.toString(info.event));
+            }
+            t.run();
+        } else {
+            // schedule the put
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Putting job {} back into the queue after {}ms.", EventUtil.toString(info.event), delay);
+            }
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            try {
+                this.scheduler.fireJobAt(null, t, null, fireDate);
+            } catch (Exception e) {
+                // we ignore the exception and just put back the job in the queue
+                ignoreException(e);
                 // then wait for the time and readd the job
                 try {
-                    Thread.sleep(sleepTime * 1000);
+                    Thread.sleep(delay);
                 } catch (InterruptedException ie) {
                     // ignore
                     ignoreException(ie);
                 }
+                t.run();
             }
-            t.run();
         }
     }
 
@@ -1666,7 +1622,7 @@
      */
     public void cancelJob(String topic, String jobId) {
         if ( jobId != null && topic != null ) {
-            this.cancelJob(this.getNodePath(topic, jobId));
+            this.cancelJob(JobUtil.getUniquePath(topic, jobId));
         }
     }
 

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,59 @@
+package org.apache.sling.event.impl.job;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.UUID;
+
+import org.apache.sling.event.impl.EventHelper;
+
+public class JobUtil {
+
+    /**
+     * Create a unique node path (folder and name) for the job.
+     */
+    public static String getUniquePath(final String jobTopic, final String jobId) {
+        final StringBuilder sb = new StringBuilder(jobTopic.replace('/', '.'));
+        sb.append('/');
+        if ( jobId != null ) {
+            // we create an md from the job id - we use the first 6 bytes to
+            // create sub directories
+            final String md5 = EventHelper.md5(jobId);
+            sb.append(md5.substring(0, 2));
+            sb.append('/');
+            sb.append(md5.substring(2, 4));
+            sb.append('/');
+            sb.append(md5.substring(4, 6));
+            sb.append('/');
+            sb.append(EventHelper.filter(jobId));
+        } else {
+            // create a path from the uuid - we use the first 6 bytes to
+            // create sub directories
+            final String uuid = UUID.randomUUID().toString();
+            sb.append(uuid.substring(0, 2));
+            sb.append('/');
+            sb.append(uuid.substring(2, 4));
+            sb.append('/');
+            sb.append(uuid.substring(5, 7));
+            sb.append("/Job_");
+            sb.append(uuid.substring(8, 17));
+        }
+        return sb.toString();
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Fri Feb 19 16:14:26 2010
@@ -18,8 +18,6 @@
  */
 package org.apache.sling.event.impl;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Dictionary;
@@ -29,9 +27,7 @@
 import javax.jcr.NodeIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
-import javax.jcr.observation.EventListenerIterator;
 
-import org.apache.sling.commons.testing.jcr.RepositoryUtil;
 import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
 import org.apache.sling.commons.threads.ThreadPoolConfig;
 import org.apache.sling.engine.SlingSettingsService;
@@ -66,19 +62,19 @@
     }
 
     @org.junit.BeforeClass public static void setupRepository() throws Exception {
-        RepositoryUtil.startRepository();
-        final SlingRepository repository = RepositoryUtil.getRepository();
+        RepositoryTestUtil.startRepository();
+        final SlingRepository repository = RepositoryTestUtil.getSlingRepository();
         session = repository.loginAdministrative(repository.getDefaultWorkspace());
-        assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
-        assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
+        assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
+        assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
     }
 
     @org.junit.AfterClass public static void shutdownRepository() throws Exception {
-        RepositoryUtil.stopRepository();
+        RepositoryTestUtil.stopRepository();
     }
 
     @org.junit.Before public void setup() throws Exception {
-        this.handler.repository = RepositoryUtil.getRepository();
+        this.handler.repository = RepositoryTestUtil.getSlingRepository();
 
         // the event admin
         final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class);
@@ -119,14 +115,6 @@
     }
 
     @org.junit.After public void shutdown() throws Exception {
-        // delete all child nodes to get a clean repository again
-        final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
-        final NodeIterator iter = rootNode.getNodes();
-        while ( iter.hasNext() ) {
-            final Node child = iter.nextNode();
-            child.remove();
-        }
-        rootNode.save();
         // lets set up the bundle context with the sling id
         final BundleContext bundleContext = this.getMockery().mock(BundleContext.class);
 
@@ -136,19 +124,18 @@
             will(returnValue(bundleContext));
         }});
         this.handler.deactivate(componentContext);
-    }
-
-    @org.junit.Test public void testSetup() throws RepositoryException {
-        assertEquals(this.handler.applicationId, SLING_ID);
-        assertEquals(this.handler.repositoryPath, REPO_PATH);
-        assertNotNull(this.handler.writerSession);
-        final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
-        boolean found = false;
-        while ( !found && iter.hasNext() ) {
-            final javax.jcr.observation.EventListener listener = iter.nextEventListener();
-            found = (listener == this.handler);
+        try {
+            // delete all child nodes to get a clean repository again
+            final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+            final NodeIterator iter = rootNode.getNodes();
+            while ( iter.hasNext() ) {
+                final Node child = iter.nextNode();
+                child.remove();
+            }
+            session.save();
+        } catch ( RepositoryException re) {
+            // we ignore this for the test
         }
-        assertTrue("Handler is not registered as event listener.", found);
     }
 
     @org.junit.Test public void testPathCreation() throws RepositoryException {

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Simplified version of the cyclic barrier class for testing. */
+public class Barrier extends CyclicBarrier {
+
+    public Barrier(int parties) {
+        super(parties);
+    }
+
+    public void block() {
+        try {
+            this.await();
+        } catch (InterruptedException e) {
+            // ignore
+        } catch (BrokenBarrierException e) {
+            // ignore
+        }
+    }
+
+    public boolean block(int seconds) {
+        try {
+            this.await(seconds, TimeUnit.SECONDS);
+            return true;
+        } catch (InterruptedException e) {
+            // ignore
+        } catch (BrokenBarrierException e) {
+            // ignore
+        } catch (TimeoutException e) {
+            // ignore
+        }
+        this.reset();
+        return false;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java Fri Feb 19 16:14:26 2010
@@ -19,6 +19,7 @@
 package org.apache.sling.event.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Calendar;
@@ -27,6 +28,8 @@
 
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.observation.EventListenerIterator;
 
 import org.apache.jackrabbit.util.ISO9075;
 import org.apache.sling.event.EventUtil;
@@ -51,6 +54,19 @@
         return this.context;
     }
 
+    @org.junit.Test public void testSetup() throws RepositoryException {
+        assertEquals(this.handler.applicationId, SLING_ID);
+        assertEquals(this.handler.repositoryPath, REPO_PATH);
+        assertNotNull(this.handler.writerSession);
+        final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
+        boolean found = false;
+        while ( !found && iter.hasNext() ) {
+            final javax.jcr.observation.EventListener listener = iter.nextEventListener();
+            found = (listener == this.handler);
+        }
+        assertTrue("Handler is not registered as event listener.", found);
+    }
+
     @org.junit.Test public void testWriteEvent() throws Exception {
         final String topic = "write/event/test";
         final Dictionary<String, Object> props = new Hashtable<String, Object>();

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.observation.EventListenerIterator;
+
+import org.apache.sling.event.EventUtil;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JMock.class)
+public class JobEventHandlerTest extends AbstractRepositoryEventHandlerTest {
+
+    protected Mockery context;
+
+    public JobEventHandlerTest() {
+        this.handler = new JobEventHandler();
+        this.context = new JUnit4Mockery();
+    }
+
+    @Override
+    protected Mockery getMockery() {
+        return this.context;
+    }
+
+    @org.junit.Test public void testSetup() throws RepositoryException {
+        assertEquals(this.handler.applicationId, SLING_ID);
+        assertEquals(this.handler.repositoryPath, REPO_PATH);
+        assertNotNull(((JobEventHandler)this.handler).backgroundSession);
+        final EventListenerIterator iter = ((JobEventHandler)this.handler).backgroundSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
+        boolean found = false;
+        while ( !found && iter.hasNext() ) {
+            final javax.jcr.observation.EventListener listener = iter.nextEventListener();
+            found = (listener == this.handler);
+        }
+        assertTrue("Handler is not registered as event listener.", found);
+    }
+
+    private Event getJobEvent() {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EventUtil.PROPERTY_JOB_TOPIC, "sling/test");
+        props.put(EventUtil.PROPERTY_JOB_RETRY_DELAY, 2000L);
+        props.put(EventUtil.PROPERTY_JOB_RETRIES, 2);
+        return new Event(EventUtil.TOPIC_JOB, props);
+    }
+
+    @org.junit.Test public void testSimpleJobExecution() throws Exception {
+        final JobEventHandler jeh = (JobEventHandler)this.handler;
+        jeh.handleEvent(getJobEvent());
+        final Barrier cb = new Barrier(2);
+        jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(Event event) {
+                            EventUtil.finishedJob(event);
+                            cb.block();
+                        }
+
+                    }
+                });
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", cb.block(5));
+    }
+
+    @org.junit.Test public void testStartJobAndReschedule() throws Exception {
+        final JobEventHandler jeh = (JobEventHandler)this.handler;
+        jeh.handleEvent(getJobEvent());
+        final Barrier cb = new Barrier(2);
+        jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(Event event) {
+                            EventUtil.rescheduleJob(event);
+                            cb.block();
+                        }
+
+                    }
+                });
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        // the job is retried after two seconds, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        // the job is retried after two seconds, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        // we have reached the retry so we expect to not get an event
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", cb.block(5));
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,130 @@
+package org.apache.sling.event.impl;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import javax.jcr.Credentials;
+import javax.jcr.LoginException;
+import javax.jcr.NoSuchWorkspaceException;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.Value;
+
+import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.commons.cnd.CndImporter;
+import org.apache.sling.jcr.api.SlingRepository;
+
+public class RepositoryTestUtil {
+
+    /** We hold an admin session the whole time. */
+    private static Session adminSession;
+
+    private static Repository repository;
+
+    private static void init() throws RepositoryException {
+        if ( repository == null ) {
+            final File f = new File("target/repository");
+            repository = JcrUtils.getRepository(f.toURI().toString());
+
+            final SimpleCredentials cred = new SimpleCredentials("admin", "admin".toCharArray());
+            adminSession = repository.login(cred);
+        }
+    }
+
+    public static void startRepository() throws RepositoryException {
+        init();
+    }
+
+    public static void stopRepository() {
+        if ( adminSession != null ) {
+            adminSession.logout();
+            adminSession = null;
+        }
+        repository = null;
+    }
+
+    public static Session getAdminSession() {
+        return adminSession;
+    }
+
+    public static SlingRepository getSlingRepository() {
+        return new RepositoryWrapper(repository);
+    }
+
+    public static boolean registerNodeType(Session session, InputStream resourceAsStream) {
+        try {
+            CndImporter.registerNodeTypes(new InputStreamReader(resourceAsStream, "UTF-8"), session);
+            return true;
+        } catch (Exception e) {
+            // ignore
+            return false;
+        }
+    }
+
+    public static final class RepositoryWrapper implements SlingRepository {
+
+        protected final Repository wrapped;
+
+        public RepositoryWrapper(Repository r) {
+            wrapped = r;
+        }
+
+        public String getDescriptor(String key) {
+            return wrapped.getDescriptor(key);
+        }
+
+        public String[] getDescriptorKeys() {
+            return wrapped.getDescriptorKeys();
+        }
+
+        public Session login() throws LoginException, RepositoryException {
+            return wrapped.login();
+        }
+
+        public Session login(Credentials credentials, String workspaceName)
+                throws LoginException, NoSuchWorkspaceException,
+                RepositoryException {
+            return wrapped.login(credentials, workspaceName);
+        }
+
+        public Session login(Credentials credentials) throws LoginException,
+                RepositoryException {
+            return wrapped.login(credentials);
+        }
+
+        public Session login(String workspaceName) throws LoginException,
+                NoSuchWorkspaceException, RepositoryException {
+            return wrapped.login(workspaceName);
+        }
+
+        public String getDefaultWorkspace() {
+            return "default";
+        }
+
+        public Session loginAdministrative(String workspace)
+                throws RepositoryException {
+            final Credentials credentials = new SimpleCredentials("admin",
+                    "admin".toCharArray());
+            return this.login(credentials, workspace);
+        }
+
+        public Value getDescriptorValue(String key) {
+            return wrapped.getDescriptorValue(key);
+        }
+
+        public Value[] getDescriptorValues(String key) {
+            return wrapped.getDescriptorValues(key);
+        }
+
+        public boolean isSingleValueDescriptor(String key) {
+            return wrapped.isSingleValueDescriptor(key);
+        }
+
+        public boolean isStandardDescriptor(String key) {
+            return wrapped.isStandardDescriptor(key);
+        }
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+/**
+ * Simple event admin implementation for testing.
+ */
+public class SimpleEventAdmin implements EventAdmin {
+
+    private final String[] topics;
+    private final EventHandler[] handler;
+
+    public SimpleEventAdmin(final String[] topics, final EventHandler[] handler) {
+        this.topics = topics;
+        this.handler = handler;
+        if ( topics == null && handler != null ) {
+            throw new IllegalArgumentException("If topics is null, handler must be null as well");
+        }
+        if ( topics.length != handler.length ) {
+            throw new IllegalArgumentException("Topics and handler must have the same size.");
+        }
+    }
+
+    public void postEvent(final Event event) {
+        new Thread() {
+            public void run() {
+                sendEvent(event);
+            }
+        }.start();
+    }
+
+    public void sendEvent(Event event) {
+        if ( topics != null ) {
+            for(int i=0; i<topics.length; i++) {
+                if ( topics[i].equals(event.getTopic()) ) {
+                    handler[i].handleEvent(event);
+                }
+            }
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain