You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/19 20:11:26 UTC
svn commit: r911940 - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/job/
src/test/java/org/apache/sling/event/ src/test/java/org/a...
Author: cziegeler
Date: Fri Feb 19 19:11:26 2010
New Revision: 911940
URL: http://svn.apache.org/viewvc?rev=911940&view=rev
Log:
SLING-1397 : Add an acknowledge method to EventUtil
Added:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java (with props)
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Fri Feb 19 19:11:26 2010
@@ -60,7 +60,7 @@
javax.jcr.*;version=1.0,*
</Import-Package>
<Export-Package>
- org.apache.sling.event;version=2.2.0
+ org.apache.sling.event;version=2.3.0
</Export-Package>
<Private-Package>
org.apache.sling.event.impl,
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Fri Feb 19 19:11:26 2010
@@ -259,8 +259,30 @@
}
/**
+ * Send an acknowledge.
+ * This signals the job handler that someone is starting to process the job. This method
+ * should be invoked as a first command during job processing.
+ * If this method returns <code>false</code> this means someone else is already
+ * processing this job, and the caller should not process the event anymore.
+ * @return Returns <code>true</code> if the acknowledge could be sent
+ * @since 2.3
+ */
+ public static boolean acknowledgeJob(Event job) {
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ if ( !ctx.notifier.sendAcknowledge(job, ctx.eventNodePath) ) {
+ // if we don't get an ack, someone else is already processing this job.
+ // we process but do not notify the job event handler.
+ LoggerFactory.getLogger(EventUtil.class).info("Someone else is already processing job {}.", job);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Notify a finished job.
- * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
*/
public static void finishedJob(Event job) {
final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
@@ -272,7 +294,6 @@
/**
* Notify a failed job.
* @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
- * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
*/
public static boolean rescheduleJob(Event job) {
final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Fri Feb 19 19:11:26 2010
@@ -164,7 +164,7 @@
/** The scheduler for rescheduling jobs.
* @scr.reference */
- private Scheduler scheduler;
+ protected Scheduler scheduler;
/** Our component context. */
private ComponentContext componentContext;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Fri Feb 19 19:11:26 2010
@@ -229,10 +229,10 @@
final Date fireDate = new Date();
fireDate.setTime(System.currentTimeMillis() + delay);
- final String schedulerJobName = "Waiting:" + queueName;
+ final String jobName = "Waiting:" + queueName;
final Runnable t = new Runnable() {
public void run() {
- setSleeping(schedulerJobName);
+ setSleeping(jobName);
try {
put(info);
} catch (InterruptedException e) {
@@ -245,7 +245,7 @@
};
if ( scheduler != null ) {
try {
- scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
+ scheduler.fireJobAt(jobName, t, null, fireDate);
} catch (Exception e) {
// we ignore the exception and just put back the job in the queue
ignoreException(e);
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java Fri Feb 19 19:11:26 2010
@@ -24,6 +24,7 @@
import java.util.Calendar;
import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.Properties;
import javax.jcr.PropertyType;
@@ -56,6 +57,15 @@
assertTrue(EventUtil.shouldDistribute(distributableEvent));
final Event nonDistributableEvent = new Event("another/topic", (Dictionary<String, Object>)null);
assertFalse(EventUtil.shouldDistribute(nonDistributableEvent));
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("a", "a");
+ props.put("b", "b");
+ final Event distributableEvent2 = EventUtil.createDistributableEvent("some/topic", props);
+ assertTrue(EventUtil.shouldDistribute(distributableEvent2));
+ // we should have four properties: 2 custom, one for the dist flag and the fourth for the topic
+ assertEquals(4, distributableEvent2.getPropertyNames().length);
+ assertEquals("a", distributableEvent2.getProperty("a"));
+ assertEquals("b", distributableEvent2.getProperty("b"));
}
@Test public void testLocalFlag() {
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=911940&r1=911939&r2=911940&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Fri Feb 19 19:11:26 2010
@@ -23,8 +23,11 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.List;
import javax.jcr.RepositoryException;
import javax.jcr.observation.EventListenerIterator;
@@ -45,6 +48,7 @@
public JobEventHandlerTest() {
this.handler = new JobEventHandler();
this.context = new JUnit4Mockery();
+ ((JobEventHandler)this.handler).scheduler = new SimpleScheduler();
}
@Override
@@ -72,11 +76,20 @@
/**
* Helper method to create a job event.
*/
- private Event getJobEvent() {
+ private Event getJobEvent(String queueName, String id, String parallel) {
final Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(EventUtil.PROPERTY_JOB_TOPIC, "sling/test");
+ if ( id != null ) {
+ props.put(EventUtil.PROPERTY_JOB_ID, id);
+ }
props.put(EventUtil.PROPERTY_JOB_RETRY_DELAY, 2000L);
props.put(EventUtil.PROPERTY_JOB_RETRIES, 2);
+ if ( queueName != null ) {
+ props.put(EventUtil.PROPERTY_JOB_QUEUE_NAME, queueName);
+ }
+ if ( parallel != null ) {
+ props.put(EventUtil.PROPERTY_JOB_PARALLEL, parallel);
+ }
return new Event(EventUtil.TOPIC_JOB, props);
}
@@ -86,18 +99,43 @@
*/
@org.junit.Test public void testSimpleJobExecution() throws Exception {
final JobEventHandler jeh = (JobEventHandler)this.handler;
- jeh.handleEvent(getJobEvent());
final Barrier cb = new Barrier(2);
jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
new EventHandler[] {
new EventHandler() {
public void handleEvent(Event event) {
+ EventUtil.acknowledgeJob(event);
+ EventUtil.finishedJob(event);
+ cb.block();
+ }
+
+ }
+ });
+ jeh.handleEvent(getJobEvent(null, null, null));
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ }
+
+ /**
+ * Test simple job execution with job id.
+ * The job is executed once and finished successfully.
+ */
+ @org.junit.Test public void testSimpleJobWithIdExecution() throws Exception {
+ final JobEventHandler jeh = (JobEventHandler)this.handler;
+ final Barrier cb = new Barrier(2);
+ jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+ new EventHandler[] {
+ new EventHandler() {
+ public void handleEvent(Event event) {
+ EventUtil.acknowledgeJob(event);
EventUtil.finishedJob(event);
cb.block();
}
}
});
+ jeh.handleEvent(getJobEvent(null, "myid", null));
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
assertFalse("Unexpected event received in the given time.", cb.block(5));
@@ -108,18 +146,29 @@
* The job is rescheduled two times before it fails.
*/
@org.junit.Test public void testStartJobAndReschedule() throws Exception {
+ final List<Integer> retryCountList = new ArrayList<Integer>();
final JobEventHandler jeh = (JobEventHandler)this.handler;
- jeh.handleEvent(getJobEvent());
final Barrier cb = new Barrier(2);
jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
new EventHandler[] {
new EventHandler() {
+ int retryCount;
public void handleEvent(Event event) {
+ EventUtil.acknowledgeJob(event);
+ int retry = 0;
+ if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+ }
+ if ( retry == retryCount ) {
+ retryCountList.add(retry);
+ }
+ retryCount++;
EventUtil.rescheduleJob(event);
cb.block();
}
}
});
+ jeh.handleEvent(getJobEvent(null, null, null));
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
// the job is retried after two seconds, so we wait again
@@ -130,6 +179,147 @@
// we have reached the retry so we expect to not get an event
cb.reset();
assertFalse("Unexpected event received in the given time.", cb.block(5));
+ assertEquals("Unexpected number of retries", 3, retryCountList.size());
}
+ /**
+ * Reschedule test.
+ * The job is rescheduled two times before it fails.
+ */
+ @org.junit.Test public void testStartJobAndRescheduleInJobQueue() throws Exception {
+ final List<Integer> retryCountList = new ArrayList<Integer>();
+ final Barrier cb = new Barrier(2);
+ final JobEventHandler jeh = (JobEventHandler)this.handler;
+ jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+ new EventHandler[] {
+ new EventHandler() {
+ int retryCount;
+ public void handleEvent(Event event) {
+ EventUtil.acknowledgeJob(event);
+ int retry = 0;
+ if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+ }
+ if ( retry == retryCount ) {
+ retryCountList.add(retry);
+ }
+ retryCount++;
+ EventUtil.rescheduleJob(event);
+ cb.block();
+ }
+ }
+ });
+ jeh.handleEvent(getJobEvent("testqueue", null, null));
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ // we have reached the retry so we expect to not get an event
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ assertEquals("Unexpected number of retries", 3, retryCountList.size());
+ }
+
+ /**
+ * Notifications.
+ * We send several jobs which are treated different and then see
+ * how many invocations have been sent.
+ */
+ @org.junit.Test public void testNotifications() throws Exception {
+ final List<String> cancelled = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> failed = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> finished = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> started = Collections.synchronizedList(new ArrayList<String>());
+ final JobEventHandler jeh = (JobEventHandler)this.handler;
+ jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test",
+ EventUtil.TOPIC_JOB_CANCELLED,
+ EventUtil.TOPIC_JOB_FAILED,
+ EventUtil.TOPIC_JOB_FINISHED,
+ EventUtil.TOPIC_JOB_STARTED},
+ new EventHandler[] {
+ new EventHandler() {
+ public void handleEvent(final Event event) {
+ EventUtil.acknowledgeJob(event);
+ // events 1 and 4 finish the first time
+ final String id = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+ if ( "1".equals(id) || "4".equals(id) ) {
+ EventUtil.finishedJob(event);
+ } else
+ // 5 fails always
+ if ( "5".equals(id) ) {
+ EventUtil.rescheduleJob(event);
+ }
+ int retry = 0;
+ if ( event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retry = (Integer)event.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+ }
+ // 2 fails the first time
+ if ( "2".equals(id) ) {
+ if ( retry == 0 ) {
+ EventUtil.rescheduleJob(event);
+ } else {
+ EventUtil.finishedJob(event);
+ }
+ }
+ // 3 fails the first and second time
+ if ( "3".equals(id) ) {
+ if ( retry == 0 || retry == 1 ) {
+ EventUtil.rescheduleJob(event);
+ } else {
+ EventUtil.finishedJob(event);
+ }
+ }
+ }
+ },
+ new EventHandler() {
+ public void handleEvent(final Event event) {
+ final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+ cancelled.add(id);
+ }
+ },
+ new EventHandler() {
+ public void handleEvent(final Event event) {
+ final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+ failed.add(id);
+ }
+ },
+ new EventHandler() {
+ public void handleEvent(final Event event) {
+ final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+ finished.add(id);
+ }
+ },
+ new EventHandler() {
+ public void handleEvent(final Event event) {
+ final Event job = (Event) event.getProperty(EventUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+ started.add(id);
+ }
+ }
+ });
+ jeh.handleEvent(getJobEvent(null, "1", "true"));
+ jeh.handleEvent(getJobEvent(null, "2", "true"));
+ jeh.handleEvent(getJobEvent(null, "3", "true"));
+ jeh.handleEvent(getJobEvent(null, "4", "true"));
+ jeh.handleEvent(getJobEvent(null, "5", "true"));
+ int count = 0;
+ final long startTime = System.currentTimeMillis();
+ do {
+ count = finished.size() + cancelled.size();
+ // after 25 seconds we cancel the test
+ if ( System.currentTimeMillis() - startTime > 25000 ) {
+ throw new Exception("Timeout during notification test.");
+ }
+ } while ( count < 5);
+ assertEquals("Finished count", 4, finished.size());
+ assertEquals("Cancelled count", 1, cancelled.size());
+ assertEquals("Started count", 10, started.size());
+ assertEquals("Failed count", 5, failed.size());
+ }
}
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java?rev=911940&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java Fri Feb 19 19:11:26 2010
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+
+/**
+ * Simple scheduler implementation for testing.
+ */
+public class SimpleScheduler implements Scheduler {
+
+ public void addJob(String name, Object job,
+ Map<String, Serializable> config, String schedulingExpression,
+ boolean canRunConcurrently) throws Exception {
+ throw new IllegalArgumentException();
+ }
+
+ public void addPeriodicJob(String name, Object job,
+ Map<String, Serializable> config, long period,
+ boolean canRunConcurrently) throws Exception {
+ throw new IllegalAccessException();
+ }
+
+ public boolean fireJob(Object job, Map<String, Serializable> config,
+ int times, long period) {
+ throw new IllegalArgumentException();
+ }
+
+ public void fireJob(Object job, Map<String, Serializable> config)
+ throws Exception {
+ throw new IllegalAccessException();
+ }
+
+ public boolean fireJobAt(String name, Object job,
+ Map<String, Serializable> config, Date date, int times, long period) {
+ throw new IllegalArgumentException();
+ }
+
+ public void fireJobAt(String name, final Object job,
+ Map<String, Serializable> config, final Date date) throws Exception {
+ new Thread() {
+ public void run() {
+ final long sleepTime = date.getTime() - System.currentTimeMillis();
+ if ( sleepTime > 0 ) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ ((Runnable)job).run();
+ }
+ }.start();
+ }
+
+ public void removeJob(String name) throws NoSuchElementException {
+ // ignore this
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain