You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/13 15:58:34 UTC

svn commit: r1022108 - in /sling/branches/eventing-3.0/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/jcr/ test/java/org/apache/sling/event/impl/jobs/

Author: cziegeler
Date: Wed Oct 13 13:58:33 2010
New Revision: 1022108

URL: http://svn.apache.org/viewvc?rev=1022108&view=rev
Log:
Add another test and improve jcr handling by registering two different listeners

Modified:
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Wed Oct 13 13:58:33 2010
@@ -592,9 +592,6 @@ public class DefaultJobManager
         boolean result = true;
         if ( job != null ) {
             result = job.remove();
-            if ( result ) {
-                this.notifyRemoveJob(jobId);
-            }
         }
         return result;
     }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java Wed Oct 13 13:58:33 2010
@@ -74,6 +74,8 @@ public abstract class JCRHelper {
     /** The nodetype for newly created folders */
     public static final String NODETYPE_ORDERED_FOLDER = "sling:OrderedFolder";
 
+    /** The property for locks. */
+    public static final String NODE_PROPERTY_LOCK_OWNER = "jcr:lockOwner";
 
     /** List of ignored properties to write to the repository. */
     private static final String[] IGNORE_PROPERTIES = new String[] {

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Wed Oct 13 13:58:33 2010
@@ -191,20 +191,18 @@ public class PersistenceHandler implemen
         loaderThread.setDaemon(true);
         loaderThread.start();
 
-        // open background session for observation
+        // open background session for all job related tasks (lock, unlock etc.)
         // create the background session and register a listener
         this.backgroundSession = this.environment.createAdminSession();
-        this.backgroundSession.getWorkspace().getObservationManager()
-             .addEventListener(this,
-                          javax.jcr.observation.Event.PROPERTY_REMOVED
-                          |javax.jcr.observation.Event.PROPERTY_ADDED
-                          |javax.jcr.observation.Event.NODE_REMOVED
-                          |javax.jcr.observation.Event.NODE_ADDED,
-                          this.repositoryPath,
-                          true,
-                          null,
-                          null,
-                          true);
+        this.backgroundSession.getWorkspace().getObservationManager().addEventListener(this,
+                javax.jcr.observation.Event.PROPERTY_REMOVED
+                |javax.jcr.observation.Event.PROPERTY_ADDED
+                |javax.jcr.observation.Event.NODE_REMOVED,
+                this.repositoryPath,
+                true,
+                null,
+                null,
+                true);
     }
 
     /**
@@ -262,7 +260,7 @@ public class PersistenceHandler implemen
                         final String propertyName = path.substring(pos+1);
 
                         // we are only interested in unlocks
-                        if ( "jcr:lockOwner".equals(propertyName) ) {
+                        if ( JCRHelper.NODE_PROPERTY_LOCK_OWNER.equals(propertyName) ) {
                             loadNodePath = path.substring(0, pos);
                         }
                     } else if ( event.getType() == javax.jcr.observation.Event.PROPERTY_ADDED ) {
@@ -270,7 +268,7 @@ public class PersistenceHandler implemen
                         final String propertyName = path.substring(pos+1);
 
                         // we are only interested in locks
-                        if ( "jcr:lockOwner".equals(propertyName) ) {
+                        if ( JCRHelper.NODE_PROPERTY_LOCK_OWNER.equals(propertyName) ) {
                             ((DefaultJobManager)this.jobManager).notifyActiveJob(path.substring(this.repositoryPath.length() + 1));
                         }
 
@@ -559,6 +557,14 @@ public class PersistenceHandler implemen
 
         try {
             writerSession = this.environment.createAdminSession();
+            // we only listen for all node added events not coming from this session(!)
+            writerSession.getWorkspace().getObservationManager().addEventListener(this,
+                         javax.jcr.observation.Event.NODE_ADDED,
+                         this.repositoryPath,
+                         true,
+                         null,
+                         null,
+                         true);
             rootNode = this.createPath(writerSession.getRootNode(),
                     this.repositoryPath.substring(1),
                     JCRHelper.NODETYPE_ORDERED_FOLDER);
@@ -576,6 +582,12 @@ public class PersistenceHandler implemen
             running = false;
         } finally {
             if ( writerSession != null ) {
+                try {
+                    writerSession.getWorkspace().getObservationManager().removeEventListener(this);
+                } catch (RepositoryException e) {
+                    // we just ignore it
+                    this.logger.warn("Unable to remove event listener.", e);
+                }
                 writerSession.logout();
             }
         }
@@ -602,12 +614,14 @@ public class PersistenceHandler implemen
                 final String jobTopic = (String)event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
                 final String nodePath = Utility.getUniquePath(jobTopic, jobId);
 
+                Node readAndProcess = null;
+
                 // if the job has no job id, we can just write the job to the repo and don't
                 // need locking
                 if ( jobId == null ) {
                     try {
-                        this.writeEvent(rootNode, event, nodePath);
-                    } catch (RepositoryException re ) {
+                        readAndProcess = this.writeEvent(rootNode, event, nodePath);
+                    } catch (final RepositoryException re ) {
                         // something went wrong, so let's log it
                         this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
                     }
@@ -621,17 +635,21 @@ public class PersistenceHandler implemen
                         if ( foundNode == null ) {
                             // We now write the event into the repository
                             try {
-                                this.writeEvent(rootNode, event, nodePath);
+                                readAndProcess = this.writeEvent(rootNode, event, nodePath);
                             } catch (ItemExistsException iee) {
                                 // someone else did already write this node in the meantime
                                 // nothing to do for us
                             }
                         }
-                    } catch (RepositoryException re ) {
+                    } catch (final RepositoryException re ) {
                         // something went wrong, so let's log it
                         this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
                     }
                 }
+
+                if ( readAndProcess != null ) {
+                    tryToLoadJob(readAndProcess, this.unloadedJobs);
+                }
             }
         }
     }
@@ -650,7 +668,7 @@ public class PersistenceHandler implemen
      * @param suggestedName A suggested name/path for the node.
      * @throws RepositoryException
      */
-    private void writeEvent(final Node rootNode, final Event e, final String path)
+    private Node writeEvent(final Node rootNode, final Event e, final String path)
     throws RepositoryException {
         // create new node with name of topic
         final Node eventNode = this.createPath(rootNode,
@@ -669,6 +687,7 @@ public class PersistenceHandler implemen
             eventNode.setProperty(JCRHelper.NODE_PROPERTY_JOBID, jobId);
         }
         rootNode.getSession().save();
+        return eventNode;
     }
 
     /**
@@ -882,6 +901,7 @@ public class PersistenceHandler implemen
                             // lock failed which means that the node is locked by someone else, so we don't have to requeue
                             return false;
                         }
+                        ((DefaultJobManager)this.jobManager).notifyActiveJob(info.uniqueId);
                         return true;
                     }
                 }
@@ -916,6 +936,7 @@ public class PersistenceHandler implemen
         synchronized ( this.backgroundLock ) {
             try {
                 if ( this.backgroundSession.itemExists(path) ) {
+                    ((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
                     final Node eventNode = (Node)this.backgroundSession.getItem(path);
                     if ( jobId == null ) {
                         // simply remove the node
@@ -964,6 +985,7 @@ public class PersistenceHandler implemen
                         }
                         eventNode.remove();
                         this.backgroundSession.save();
+                        ((DefaultJobManager)this.jobManager).notifyRemoveJob(jobId);
                     }
                 } catch (RepositoryException e) {
                     this.logger.error("Error during cancelling job at " + path, e);

Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java (original)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java Wed Oct 13 13:58:33 2010
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jcr.Node;
 import javax.jcr.Session;
@@ -43,6 +44,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.jobs.JobManager.QueryType;
+import org.apache.sling.event.jobs.JobProcessor;
 import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.JobsIterator;
 import org.jmock.Mockery;
@@ -590,4 +592,43 @@ public class JobEventHandlerTest extends
         cb.reset();
         assertEquals("Unexpected number of retries", 3, retryCountList.size());
     }
+
+    @org.junit.Test public void testManyJobs() throws Exception {
+        final PersistenceHandler jeh = this.handler;
+        final AtomicInteger count = new AtomicInteger(0);
+        setEventAdmin(new SimpleEventAdmin(new String[] {"sling/test",
+                JobUtil.TOPIC_JOB_FINISHED},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            JobUtil.processJob(event, new JobProcessor() {
+
+                                public boolean process(Event job) {
+                                    try {
+                                        Thread.sleep(200);
+                                    } catch (InterruptedException ie) {}
+                                    return true;
+                                }
+                            });
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            count.incrementAndGet();
+                        }
+                    }}));
+        // we start "some" jobs
+        final int COUNT = 300;
+        for(int i = 0; i < COUNT; i++ ) {
+            final String queueName = "queue" + (i % 20);
+            jeh.handleEvent(getJobEvent(queueName, null, "2"));
+        }
+        while ( count.get() < COUNT ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {}
+        }
+        assertEquals("Finished count", COUNT, count.get());
+        assertEquals("Finished count", COUNT, this.jobManager.getStatistics().getNumberOfFinishedJobs());
+    }
 }