You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/15 11:42:27 UTC

svn commit: r1022870 - in /sling/branches/eventing-3.0/src: main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java

Author: cziegeler
Date: Fri Oct 15 09:42:26 2010
New Revision: 1022870

URL: http://svn.apache.org/viewvc?rev=1022870&view=rev
Log:
Fix some NPE's on shutdown and enhance test

Modified:
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1022870&r1=1022869&r2=1022870&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Fri Oct 15 09:42:26 2010
@@ -887,6 +887,9 @@ public class PersistenceHandler implemen
     public boolean lock(final JobEvent info) {
         final String path = this.getNodePath(info.uniqueId);
         synchronized ( this.backgroundLock ) {
+            if ( !this.running ) {
+                return false;
+            }
             try {
                 // check if the node still exists
                 if ( this.backgroundSession.itemExists(path)
@@ -918,6 +921,9 @@ public class PersistenceHandler implemen
     public void unlock(final JobEvent info) {
         final String path = this.getNodePath(info.uniqueId);
         synchronized ( this.backgroundLock ) {
+            if ( !this.running ) {
+                return;
+            }
             try {
                 this.backgroundSession.getWorkspace().getLockManager().unlock(path);
             } catch (RepositoryException re) {
@@ -934,6 +940,9 @@ public class PersistenceHandler implemen
         final String jobId = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_NAME);
         final String path = this.getNodePath(info.uniqueId);
         synchronized ( this.backgroundLock ) {
+            if ( !this.running ) {
+                return;
+            }
             try {
                 if ( this.backgroundSession.itemExists(path) ) {
                     ((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);

Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java?rev=1022870&r1=1022869&r2=1022870&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java (original)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java Fri Oct 15 09:42:26 2010
@@ -50,6 +50,8 @@ public class RoundRobinQueueTest extends
 
     private static final String QUEUE_NAME = "roundrobintest";
     private static final String TOPIC = "sling/test";
+    private static int MAX_PAR = 5;
+    private static int NUM_JOBS = 300;
 
     protected Mockery context;
 
@@ -77,7 +79,7 @@ public class RoundRobinQueueTest extends
         queueProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*");
         queueProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
         queueProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN);
-        queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 5);
+        queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR);
 
         final InternalQueueConfiguration mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
         return new QueueConfigurationManager() {
@@ -92,12 +94,22 @@ public class RoundRobinQueueTest extends
     /**
      * Helper method to create a job event.
      */
-    private Event getJobEvent(final String subTopic) {
+    private Event getJobEvent(final String subTopic, final String id) {
         final Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(JobUtil.PROPERTY_JOB_TOPIC, TOPIC + '/' + subTopic);
+        if ( id != null ) {
+            props.put(JobUtil.PROPERTY_JOB_NAME, id);
+        }
         return new Event(JobUtil.TOPIC_JOB, props);
     }
 
+    /**
+     * Helper method to create a job event.
+     */
+    private Event getJobEvent(final String subTopic) {
+        return this.getJobEvent(subTopic, null);
+    }
+
     @org.junit.Test public void testRoundRobinQueue() throws Exception {
         final PersistenceHandler jeh = this.handler;
 
@@ -125,6 +137,7 @@ public class RoundRobinQueueTest extends
         q.suspend();
         // set new event admin
         final AtomicInteger count = new AtomicInteger(0);
+        final AtomicInteger parallelCount = new AtomicInteger(0);
         setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC + '*',
                 JobUtil.TOPIC_JOB_FINISHED},
                 new EventHandler[] {
@@ -133,11 +146,15 @@ public class RoundRobinQueueTest extends
                             JobUtil.processJob(event, new JobProcessor() {
 
                                 public boolean process(Event job) {
+                                    if ( parallelCount.incrementAndGet() > MAX_PAR ) {
+                                        return false;
+                                    }
                                     try {
                                         Thread.sleep(30);
                                     } catch (InterruptedException ie) {
                                         // ignore
                                     }
+                                    parallelCount.decrementAndGet();
                                     return true;
                                 }
                             });
@@ -148,24 +165,33 @@ public class RoundRobinQueueTest extends
                             count.incrementAndGet();
                         }
                     }}));
-        // we start "some" jobs
-        final int COUNT = 300;
-        for(int i = 0; i < COUNT; i++ ) {
+        // we start "some" jobs:
+        // first jobs without id
+        for(int i = 0; i < NUM_JOBS; i++ ) {
             final String subTopic = "sub" + (i % 10);
             jeh.handleEvent(getJobEvent(subTopic));
         }
+        // second jobs with id
+        for(int i = 0; i < NUM_JOBS; i++ ) {
+            final String subTopic = "sub" + (i % 10);
+            jeh.handleEvent(getJobEvent(subTopic, "id" + i));
+        }
         // start the queue
         q.resume();
-        while ( count.get() < COUNT ) {
+        while ( count.get() < 2 * NUM_JOBS ) {
+            assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+            assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
             try {
                 Thread.sleep(500);
             } catch (InterruptedException ie) {
                 // ignore
             }
         }
-        assertEquals("Finished count", COUNT, count.get());
+        assertEquals("Finished count", 2 * NUM_JOBS, count.get());
         // we started one event before the test, so add one
-        assertEquals("Finished count", COUNT + 1, this.jobManager.getStatistics().getNumberOfFinishedJobs());
-        assertEquals("Finished count", COUNT + 1, q.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Finished count", 2 * NUM_JOBS + 1, this.jobManager.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Finished count", 2 * NUM_JOBS + 1, q.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+        assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
     }
 }