You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/12/14 10:12:56 UTC

svn commit: r1048989 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java

Author: cziegeler
Date: Tue Dec 14 09:12:55 2010
New Revision: 1048989

URL: http://svn.apache.org/viewvc?rev=1048989&view=rev
Log:
Fix potential NPE in rescheduling of unprocessed jobs and add test case

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1048989&r1=1048988&r2=1048989&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Tue Dec 14 09:12:55 2010
@@ -1033,8 +1033,12 @@ public class PersistenceHandler implemen
             try {
                 if ( this.backgroundSession.itemExists(path) ) {
                     final Node eventNode = (Node)this.backgroundSession.getItem(path);
-                    eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRIES, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES));
-                    eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT));
+                    if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES) != null ) {
+                        eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRIES, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES));
+                    }
+                    if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                        eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT));
+                    }
                     eventNode.setProperty(JCRHelper.NODE_PROPERTY_PROCESSOR, Environment.APPLICATION_ID);
                     this.backgroundSession.save();
 

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java?rev=1048989&r1=1048988&r2=1048989&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java Tue Dec 14 09:12:55 2010
@@ -635,4 +635,84 @@ public class JobEventHandlerTest extends
         assertEquals("Finished count", COUNT, count.get());
         assertEquals("Finished count", COUNT, this.jobManager.getStatistics().getNumberOfFinishedJobs());
     }
+
+    /**
+     * Test sending of jobs with and without a processor
+     */
+    @org.junit.Test public void testNoJobProcessor() throws Exception {
+        final PersistenceHandler jeh = this.handler;
+        final AtomicInteger count = new AtomicInteger(0);
+        final AtomicInteger unprocessedCount = new AtomicInteger(0);
+        setEventAdmin(new SimpleEventAdmin(new String[] {"sling/test",
+                "sling/test2",
+                JobUtil.TOPIC_JOB_FINISHED},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            JobUtil.processJob(event, new JobProcessor() {
+
+                                public boolean process(Event job) {
+                                    try {
+                                        Thread.sleep(200);
+                                    } catch (InterruptedException ie) {
+                                        // ignore
+                                    }
+                                    return true;
+                                }
+                            });
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            unprocessedCount.incrementAndGet();
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            count.incrementAndGet();
+                        }
+                    }}));
+        // we start 20 jobs, every second job has no processor
+        final long startTime = System.currentTimeMillis();
+        final int COUNT = 20;
+        for(int i = 0; i < COUNT; i++ ) {
+            final String jobTopic = (i % 2 == 0 ? "sling/test" : "sling/test2");
+            final Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(JobUtil.PROPERTY_JOB_TOPIC, jobTopic);
+            jeh.handleEvent(new Event(JobUtil.TOPIC_JOB, props));
+        }
+        while ( count.get() < COUNT / 2) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        while ( unprocessedCount.get() < COUNT / 2) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        // clean up waits for one minute, so we should do the same
+        while ( System.currentTimeMillis() - startTime < 61000 ) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        this.jobManager.run();
+        while ( unprocessedCount.get() < COUNT ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        assertEquals("Finished count", COUNT / 2, count.get());
+        assertEquals("Unprocessed count",COUNT, unprocessedCount.get());
+        assertEquals("Finished count", COUNT / 2, this.jobManager.getStatistics().getNumberOfFinishedJobs());
+    }
 }