You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/19 20:11:26 UTC

svn commit: r911940 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/job/ src/test/java/org/apache/sling/event/ src/test/java/org/a...

Author: cziegeler
Date: Fri Feb 19 19:11:26 2010
New Revision: 911940

URL: http://svn.apache.org/viewvc?rev=911940&view=rev
Log:
SLING-1397 : Add an acknowledge method to EventUtil

Added:
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Fri Feb 19 19:11:26 2010
@@ -60,7 +60,7 @@
                             javax.jcr.*;version=1.0,*
                         </Import-Package>
                         <Export-Package>
-                            org.apache.sling.event;version=2.2.0
+                            org.apache.sling.event;version=2.3.0
                         </Export-Package>
                         <Private-Package>
                             org.apache.sling.event.impl,

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Fri Feb 19 19:11:26 2010
@@ -259,8 +259,30 @@
     }
 
     /**
+     * Send an acknowledge.
+     * This signals the job handler that someone is starting to process the job. This method
+     * should be invoked as a first command during job processing.
+     * If this method returns <code>false</code> this means someone else is already
+     * processing this job, and the caller should not process the event anymore.
+     * @return Returns <code>true</code> if the acknowledge could be sent
+     * @since 2.3
+     */
+    public static boolean acknowledgeJob(Event job) {
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            if ( !ctx.notifier.sendAcknowledge(job, ctx.eventNodePath) ) {
+                // if we don't get an ack, someone else is already processing this job.
+                // we process but do not notify the job event handler.
+                LoggerFactory.getLogger(EventUtil.class).info("Someone else is already processing job {}.", job);
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    /**
      * Notify a finished job.
-     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
      */
     public static void finishedJob(Event job) {
         final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
@@ -272,7 +294,6 @@
     /**
      * Notify a failed job.
      * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
-     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
      */
     public static boolean rescheduleJob(Event job) {
         final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Fri Feb 19 19:11:26 2010
@@ -164,7 +164,7 @@
 
     /** The scheduler for rescheduling jobs.
      * @scr.reference */
-    private Scheduler scheduler;
+    protected Scheduler scheduler;
 
     /** Our component context. */
     private ComponentContext componentContext;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Fri Feb 19 19:11:26 2010
@@ -229,10 +229,10 @@
             final Date fireDate = new Date();
             fireDate.setTime(System.currentTimeMillis() + delay);
 
-            final String schedulerJobName = "Waiting:" + queueName;
+            final String jobName = "Waiting:" + queueName;
             final Runnable t = new Runnable() {
                 public void run() {
-                    setSleeping(schedulerJobName);
+                    setSleeping(jobName);
                     try {
                         put(info);
                     } catch (InterruptedException e) {
@@ -245,7 +245,7 @@
             };
             if ( scheduler != null ) {
                 try {
-                    scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
+                    scheduler.fireJobAt(jobName, t, null, fireDate);
                 } catch (Exception e) {
                     // we ignore the exception and just put back the job in the queue
                     ignoreException(e);

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java Fri Feb 19 19:11:26 2010
@@ -24,6 +24,7 @@
 
 import java.util.Calendar;
 import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.Properties;
 
 import javax.jcr.PropertyType;
@@ -56,6 +57,15 @@
         assertTrue(EventUtil.shouldDistribute(distributableEvent));
         final Event nonDistributableEvent = new Event("another/topic", (Dictionary<String, Object>)null);
         assertFalse(EventUtil.shouldDistribute(nonDistributableEvent));
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("a", "a");
+        props.put("b", "b");
+        final Event distributableEvent2 = EventUtil.createDistributableEvent("some/topic", props);
+        assertTrue(EventUtil.shouldDistribute(distributableEvent2));
+        // we should have four properties: 2 custom, one for the dist flag and the fourth for the topic
+        assertEquals(4, distributableEvent2.getPropertyNames().length);
+        assertEquals("a", distributableEvent2.getProperty("a"));
+        assertEquals("b", distributableEvent2.getProperty("b"));
     }
 
     @Test public void testLocalFlag() {

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Fri Feb 19 19:11:26 2010
@@ -23,8 +23,11 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
+import java.util.List;
 
 import javax.jcr.RepositoryException;
 import javax.jcr.observation.EventListenerIterator;
@@ -45,6 +48,7 @@
     public JobEventHandlerTest() {
         this.handler = new JobEventHandler();
         this.context = new JUnit4Mockery();
+        ((JobEventHandler)this.handler).scheduler = new SimpleScheduler();
     }
 
     @Override
@@ -72,11 +76,20 @@
     /**
      * Helper method to create a job event.
      */
-    private Event getJobEvent() {
+    private Event getJobEvent(String queueName, String id, String parallel) {
         final Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(EventUtil.PROPERTY_JOB_TOPIC, "sling/test");
+        if ( id != null ) {
+            props.put(EventUtil.PROPERTY_JOB_ID, id);
+        }
         props.put(EventUtil.PROPERTY_JOB_RETRY_DELAY, 2000L);
         props.put(EventUtil.PROPERTY_JOB_RETRIES, 2);
+        if ( queueName != null ) {
+            props.put(EventUtil.PROPERTY_JOB_QUEUE_NAME, queueName);
+        }
+        if ( parallel != null ) {
+            props.put(EventUtil.PROPERTY_JOB_PARALLEL, parallel);
+        }
         return new Event(EventUtil.TOPIC_JOB, props);
     }
 
@@ -86,18 +99,43 @@
      */
     @org.junit.Test public void testSimpleJobExecution() throws Exception {
         final JobEventHandler jeh = (JobEventHandler)this.handler;
-        jeh.handleEvent(getJobEvent());
         final Barrier cb = new Barrier(2);
         jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
                 new EventHandler[] {
                     new EventHandler() {
                         public void handleEvent(Event event) {
+                            EventUtil.acknowledgeJob(event);
+                            EventUtil.finishedJob(event);
+                            cb.block();
+                        }
+
+                    }
+                });
+        jeh.handleEvent(getJobEvent(null, null, null));
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", cb.block(5));
+    }
+
+    /**
+     * Test simple job execution with job id.
+     * The job is executed once and finished successfully.
+     */
+    @org.junit.Test public void testSimpleJobWithIdExecution() throws Exception {
+        final JobEventHandler jeh = (JobEventHandler)this.handler;
+        final Barrier cb = new Barrier(2);
+        jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(Event event) {
+                            EventUtil.acknowledgeJob(event);
                             EventUtil.finishedJob(event);
                             cb.block();
                         }
 
                     }
                 });
+        jeh.handleEvent(getJobEvent(null, "myid", null));
         assertTrue("No event received in the given time.", cb.block(5));
         cb.reset();
         assertFalse("Unexpected event received in the given time.", cb.block(5));
@@ -108,18 +146,29 @@
      * The job is rescheduled two times before it fails.
      */
     @org.junit.Test public void testStartJobAndReschedule() throws Exception {
+        final List<Integer> retryCountList = new ArrayList<Integer>();
         final JobEventHandler jeh = (JobEventHandler)this.handler;
-        jeh.handleEvent(getJobEvent());
         final Barrier cb = new Barrier(2);
         jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
                 new EventHandler[] {
                     new EventHandler() {
+                        int retryCount;
                         public void handleEvent(Event event) {
+                            EventUtil.acknowledgeJob(event);
+                            int retry = 0;
+                            if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                                retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+                            }
+                            if ( retry == retryCount ) {
+                                retryCountList.add(retry);
+                            }
+                            retryCount++;
                             EventUtil.rescheduleJob(event);
                             cb.block();
                         }
                     }
                 });
+        jeh.handleEvent(getJobEvent(null, null, null));
         assertTrue("No event received in the given time.", cb.block(5));
         cb.reset();
         // the job is retried after two seconds, so we wait again
@@ -130,6 +179,147 @@
         // we have reached the retry so we expect to not get an event
         cb.reset();
         assertFalse("Unexpected event received in the given time.", cb.block(5));
+        assertEquals("Unexpected number of retries", 3, retryCountList.size());
     }
 
+    /**
+     * Reschedule test.
+     * The job is rescheduled two times before it fails.
+     */
+    @org.junit.Test public void testStartJobAndRescheduleInJobQueue() throws Exception {
+        final List<Integer> retryCountList = new ArrayList<Integer>();
+        final Barrier cb = new Barrier(2);
+        final JobEventHandler jeh = (JobEventHandler)this.handler;
+        jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+                new EventHandler[] {
+                    new EventHandler() {
+                        int retryCount;
+                        public void handleEvent(Event event) {
+                            EventUtil.acknowledgeJob(event);
+                            int retry = 0;
+                            if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                                retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+                            }
+                            if ( retry == retryCount ) {
+                                retryCountList.add(retry);
+                            }
+                            retryCount++;
+                            EventUtil.rescheduleJob(event);
+                            cb.block();
+                        }
+                    }
+                });
+        jeh.handleEvent(getJobEvent("testqueue", null, null));
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        // the job is retried after two seconds, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        // the job is retried after two seconds, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        // we have reached the retry so we expect to not get an event
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", cb.block(5));
+        assertEquals("Unexpected number of retries", 3, retryCountList.size());
+    }
+
+    /**
+     * Notifications.
+     * We send several jobs which are treated different and then see
+     * how many invocations have been sent.
+     */
+    @org.junit.Test public void testNotifications() throws Exception {
+        final List<String> cancelled = Collections.synchronizedList(new ArrayList<String>());
+        final List<String> failed = Collections.synchronizedList(new ArrayList<String>());
+        final List<String> finished = Collections.synchronizedList(new ArrayList<String>());
+        final List<String> started = Collections.synchronizedList(new ArrayList<String>());
+        final JobEventHandler jeh = (JobEventHandler)this.handler;
+        jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test",
+                EventUtil.TOPIC_JOB_CANCELLED,
+                EventUtil.TOPIC_JOB_FAILED,
+                EventUtil.TOPIC_JOB_FINISHED,
+                EventUtil.TOPIC_JOB_STARTED},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            EventUtil.acknowledgeJob(event);
+                            // events 1 and 4 finish the first time
+                            final String id = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+                            if ( "1".equals(id) || "4".equals(id) ) {
+                                EventUtil.finishedJob(event);
+                            } else
+                            // 5 fails always
+                            if ( "5".equals(id) ) {
+                                EventUtil.rescheduleJob(event);
+                            }
+                            int retry = 0;
+                            if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                                retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+                            }
+                            // 2 fails the first time
+                            if ( "2".equals(id) ) {
+                                if ( retry == 0 ) {
+                                    EventUtil.rescheduleJob(event);
+                                } else {
+                                    EventUtil.finishedJob(event);
+                                }
+                            }
+                            // 3 fails the first and second time
+                            if ( "3".equals(id) ) {
+                                if ( retry == 0 || retry == 1 ) {
+                                    EventUtil.rescheduleJob(event);
+                                } else {
+                                    EventUtil.finishedJob(event);
+                                }
+                            }
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+                            final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+                            cancelled.add(id);
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+                            final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+                            failed.add(id);
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+                            final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+                            finished.add(id);
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+                            final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+                            started.add(id);
+                        }
+                    }
+                });
+        jeh.handleEvent(getJobEvent(null, "1", "true"));
+        jeh.handleEvent(getJobEvent(null, "2", "true"));
+        jeh.handleEvent(getJobEvent(null, "3", "true"));
+        jeh.handleEvent(getJobEvent(null, "4", "true"));
+        jeh.handleEvent(getJobEvent(null, "5", "true"));
+        int count = 0;
+        final long startTime = System.currentTimeMillis();
+        do {
+            count = finished.size() + cancelled.size();
+            // after 25 seconds we cancel the test
+            if ( System.currentTimeMillis() - startTime > 25000 ) {
+                throw new Exception("Timeout during notification test.");
+            }
+        } while ( count < 5);
+        assertEquals("Finished count", 4, finished.size());
+        assertEquals("Cancelled count", 1, cancelled.size());
+        assertEquals("Started count", 10, started.size());
+        assertEquals("Failed count", 5, failed.size());
+    }
 }

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java?rev=911940&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java Fri Feb 19 19:11:26 2010
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+
+/**
+ * Simple scheduler implementation for testing.
+ */
+public class SimpleScheduler implements Scheduler {
+
+    public void addJob(String name, Object job,
+            Map<String, Serializable> config, String schedulingExpression,
+            boolean canRunConcurrently) throws Exception {
+        throw new IllegalArgumentException();
+    }
+
+    public void addPeriodicJob(String name, Object job,
+            Map<String, Serializable> config, long period,
+            boolean canRunConcurrently) throws Exception {
+        throw new IllegalAccessException();
+    }
+
+    public boolean fireJob(Object job, Map<String, Serializable> config,
+            int times, long period) {
+        throw new IllegalArgumentException();
+    }
+
+    public void fireJob(Object job, Map<String, Serializable> config)
+            throws Exception {
+        throw new IllegalAccessException();
+    }
+
+    public boolean fireJobAt(String name, Object job,
+            Map<String, Serializable> config, Date date, int times, long period) {
+        throw new IllegalArgumentException();
+    }
+
+    public void fireJobAt(String name, final Object job,
+            Map<String, Serializable> config, final Date date) throws Exception {
+        new Thread() {
+            public void run() {
+                final long sleepTime = date.getTime() - System.currentTimeMillis();
+                if ( sleepTime > 0 ) {
+                    try {
+                        Thread.sleep(sleepTime);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+                ((Runnable)job).run();
+            }
+        }.start();
+    }
+
+    public void removeJob(String name) throws NoSuchElementException {
+        // ignore this
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain