You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/22 13:42:55 UTC

svn commit: r1470462 [7/7] - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/dea/ src/main/java/org/apache/sling/event/impl/jobs/ sr...

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.options;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.launchpad.api.StartupHandler;
+import org.apache.sling.launchpad.api.StartupMode;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.Configuration;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+public abstract class AbstractJobHandlingTest {
+
+    private static final String BUNDLE_JAR_SYS_PROP = "project.bundle.file";
+
+    @Inject
+    protected EventAdmin eventAdmin;
+
+    @Inject
+    protected ConfigurationAdmin configAdmin;
+
+    @Inject
+    protected BundleContext bc;
+
+    @Configuration
+    public Option[] config() {
+        final String bundleFileName = System.getProperty( BUNDLE_JAR_SYS_PROP );
+        final File bundleFile = new File( bundleFileName );
+        if ( !bundleFile.canRead() ) {
+            throw new IllegalArgumentException( "Cannot read from bundle file " + bundleFileName + " specified in the "
+                + BUNDLE_JAR_SYS_PROP + " system property" );
+        }
+
+        return options(
+                mavenBundle("org.apache.sling", "org.apache.sling.fragment.xml", "1.0.2"),
+                mavenBundle("org.apache.sling", "org.apache.sling.fragment.transaction", "1.0.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.fragment.activation", "1.0.2"),
+                mavenBundle("org.apache.sling", "org.apache.sling.fragment.ws", "1.0.2"),
+
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.log", "3.0.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.logservice", "1.0.2"),
+
+                mavenBundle("org.slf4j", "slf4j-api", "1.6.4"),
+                mavenBundle("org.slf4j", "jcl-over-slf4j", "1.6.4"),
+                mavenBundle("org.slf4j", "log4j-over-slf4j", "1.6.4"),
+
+                mavenBundle("commons-io", "commons-io", "1.4"),
+                mavenBundle("commons-fileupload", "commons-fileupload", "1.2.2"),
+                mavenBundle("commons-collections", "commons-collections", "3.2.1"),
+                mavenBundle("commons-codec", "commons-codec", "1.6"),
+                mavenBundle("commons-lang", "commons-lang", "2.5"),
+
+                mavenBundle("org.apache.geronimo.bundles", "commons-httpclient", "3.1_1"),
+                mavenBundle("org.apache.tika", "tika-core", "1.0"),
+                mavenBundle("org.apache.tika", "tika-bundle", "1.0"),
+
+                mavenBundle("org.apache.felix", "org.apache.felix.http.jetty", "2.2.0"),
+                mavenBundle("org.apache.felix", "org.apache.felix.eventadmin", "1.2.14"),
+                mavenBundle("org.apache.felix", "org.apache.felix.scr", "1.6.2"),
+                mavenBundle("org.apache.felix", "org.apache.felix.configadmin", "1.6.0"),
+//                mavenBundle("org.apache.felix", "org.apache.felix.metatype", "1.0.6"),
+
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.osgi", "2.2.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.json", "2.0.6"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.mime", "2.1.4"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.classloader", "1.3.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.3.4"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.1.0"),
+
+                mavenBundle("org.apache.sling", "org.apache.sling.launchpad.api", "1.1.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.1.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "0.1.0-SNAPSHOT"),
+
+                mavenBundle("org.apache.sling", "org.apache.sling.api", "2.4.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.settings", "1.2.2"),
+                mavenBundle("org.apache.sling", "org.apache.sling.resourceresolver", "1.0.6"),
+                mavenBundle("org.apache.sling", "org.apache.sling.adapter", "2.1.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.resource", "2.2.6"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.classloader", "3.1.12"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.contentloader", "2.1.2"),
+                mavenBundle("org.apache.sling", "org.apache.sling.engine", "2.2.6"),
+
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.jcr-wrapper", "2.0.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.api", "2.1.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.base", "2.1.2"),
+                mavenBundle("org.apache.jackrabbit", "jackrabbit-api", "2.4.2"),
+                mavenBundle("org.apache.jackrabbit", "jackrabbit-jcr-commons", "2.4.2"),
+                mavenBundle("org.apache.jackrabbit", "jackrabbit-spi", "2.4.2"),
+                mavenBundle("org.apache.jackrabbit", "jackrabbit-spi-commons", "2.4.2"),
+                mavenBundle("org.apache.jackrabbit", "jackrabbit-jcr-rmi", "2.4.2"),
+                mavenBundle("org.apache.derby", "derby", "10.5.3.0_1"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.jackrabbit.server", "2.1.1-SNAPSHOT"),
+
+                CoreOptions.bundle( bundleFile.toURI().toString() ),
+
+                junitBundles()
+           );
+    }
+
+    protected JobManager getJobManager() {
+        final ServiceReference sr = this.bc.getServiceReference(JobManager.class.getName());
+        return (JobManager)this.bc.getService(sr);
+    }
+
+    protected void sleep(final long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+    }
+
+    public void setup() throws IOException {
+        // set load delay to 3 sec
+        final org.osgi.service.cm.Configuration c2 = this.configAdmin.getConfiguration("org.apache.sling.event.impl.jobs.jcr.PersistenceHandler", null);
+        Dictionary<String, Object> p2 = new Hashtable<String, Object>();
+        p2.put(JobManagerConfiguration.CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY, 3L);
+        c2.update(p2);
+
+        final StartupHandler handler = new StartupHandler() {
+
+            @Override
+            public void waitWithStartup(boolean flag) {
+            }
+
+            @Override
+            public boolean isFinished() {
+                return true;
+            }
+
+            @Override
+            public StartupMode getMode() {
+                return StartupMode.INSTALL;
+            }
+        };
+        this.bc.registerService(StartupHandler.class.getName(), handler, null);
+
+        // cluster discovery
+        final org.osgi.service.cm.Configuration c = this.configAdmin.getConfiguration("org.apache.sling.discovery.impl.NoClusterDiscoveryService",  null);
+        Dictionary<String, Object> p = new Hashtable<String, Object>();
+        p.put("a", "b");
+        c.update(p);
+    }
+
+    /**
+     * Helper method to register an event handler
+     */
+    protected ServiceRegistration registerEventHandler(final String topic,
+            final EventHandler handler) {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EventConstants.EVENT_TOPIC, topic);
+        final ServiceRegistration reg = this.bc.registerService(EventHandler.class.getName(),
+                handler, props);
+        return reg;
+    }
+
+    /**
+     * Helper method to register a job consumer
+     */
+    protected ServiceRegistration registerJobConsumer(final String topic,
+            final JobConsumer handler) {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(JobConsumer.PROPERTY_TOPICS, topic);
+        final ServiceRegistration reg = this.bc.registerService(JobConsumer.class.getName(),
+                handler, props);
+        return reg;
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class ClassloadingTest extends AbstractJobHandlingTest {
+
+    private static final String QUEUE_NAME = "cltest";
+    private static final String TOPIC = "sling/cltest";
+
+    private String queueConfPid;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ignore test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+        orderedConfig.update(orderedProps);
+
+        this.queueConfPid = orderedConfig.getPid();
+
+        this.sleep(1000L);
+    }
+
+
+    @org.junit.Test public void testSimpleClassloading() throws Exception {
+        final AtomicInteger count = new AtomicInteger(0);
+        final List<Event> finishedEvents = Collections.synchronizedList(new ArrayList<Event>());
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        count.incrementAndGet();
+                        return true;
+                    }
+                });
+        final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        finishedEvents.add(event);
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+
+            final List<String> list = new ArrayList<String>();
+            list.add("1");
+            list.add("2");
+
+            final EventPropertiesMap map = new EventPropertiesMap();
+            map.put("a", "a1");
+            map.put("b", "b2");
+
+            // we start a single job
+            final Map<String, Object> props = new HashMap<String, Object>();
+            props.put("string", "Hello");
+            props.put("int", new Integer(5));
+            props.put("long", new Long(7));
+            props.put("list", list);
+            props.put("map", map);
+
+            jobManager.addJob(TOPIC, null, props);
+
+            while ( finishedEvents.size() < 1 ) {
+                // we wait a little bit
+                Thread.sleep(100);
+            }
+
+            // no jobs queued, none processed and no available
+            assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+            assertEquals(1, count.get());
+            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+
+            final String jobTopic = (String)finishedEvents.get(0).getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_TOPIC);
+            assertNotNull(jobTopic);
+            assertEquals("Hello", finishedEvents.get(0).getProperty("string"));
+            assertEquals(new Integer(5), Integer.valueOf(finishedEvents.get(0).getProperty("int").toString()));
+            assertEquals(new Long(7), Long.valueOf(finishedEvents.get(0).getProperty("long").toString()));
+            assertEquals(list, finishedEvents.get(0).getProperty("list"));
+            assertEquals(map, finishedEvents.get(0).getProperty("map"));
+        } finally {
+            jcReg.unregister();
+            ehReg.unregister();
+        }
+    }
+
+    @org.junit.Test public void testFailedClassloading() throws Exception {
+        final AtomicInteger count = new AtomicInteger(0);
+        final List<Event> finishedEvents = Collections.synchronizedList(new ArrayList<Event>());
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC + "/failed",
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        count.incrementAndGet();
+                        return true;
+                    }
+                });
+        final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        finishedEvents.add(event);
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+
+            // dao is an invisible class for the dynamic class loader as it is not public
+            // therefore scheduling this job should fail!
+            final DataObject dao = new DataObject();
+            dao.message = "Hello World";
+
+            // we start a single job
+            final Map<String, Object> props = new HashMap<String, Object>();
+            props.put("dao", dao);
+
+            jobManager.addJob(TOPIC + "/failed", null, props);
+
+            // we simply wait a little bit
+            sleep(2000);
+
+            assertEquals(0, count.get());
+            assertEquals(0, finishedEvents.size());
+            assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1).size());
+            assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+            assertEquals(0, jobManager.getStatistics().getNumberOfActiveJobs());
+
+        } finally {
+            jcReg.unregister();
+            ehReg.unregister();
+        }
+    }
+
+    private static final class DataObject implements Serializable {
+        public String message;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class DropQueueTest extends AbstractJobHandlingTest {
+
+    private static final String QUEUE_NAME = "droptest";
+    private static final String TOPIC = "sling/droptest";
+    private static int NUM_JOBS = 10;
+
+    private String queueConfPid;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ignore test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.DROP.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+        orderedConfig.update(orderedProps);
+
+        this.queueConfPid = orderedConfig.getPid();
+
+        this.sleep(1000L);
+    }
+
+
+    @org.junit.Test public void testDroppingQueue() throws Exception {
+        final AtomicInteger count = new AtomicInteger(0);
+        final AtomicInteger dropCount = new AtomicInteger(0);
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        count.incrementAndGet();
+                        return true;
+                    }
+                });
+        final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_CANCELLED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        dropCount.incrementAndGet();
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+
+            // we start "some" jobs:
+            for(int i = 0; i < NUM_JOBS; i++ ) {
+                jobManager.addJob(TOPIC, null, null);
+            }
+            while ( dropCount.get() < NUM_JOBS ) {
+                // we wait a little bit
+                Thread.sleep(100);
+            }
+
+            // no jobs queued, none processed and no available
+            assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+            assertEquals(0, count.get());
+            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+
+            // let'see if restarting helps with a new queue config
+            final org.osgi.service.cm.Configuration cf = this.configAdmin.getConfiguration(this.queueConfPid, null);
+            final Dictionary<String, Object> orderedProps = cf.getProperties();
+
+            orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+            cf.update(orderedProps);
+
+            jobManager.restart();
+            // we wait a little bit
+            Thread.sleep(1200); // TODO - we have to wait until reload is done
+
+             // no jobs queued, none processed and no available
+            assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+            assertEquals(0, count.get());
+            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+        } finally {
+            jcReg.unregister();
+            ehReg.unregister();
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class IgnoreQueueTest extends AbstractJobHandlingTest {
+
+    private static final String QUEUE_NAME = "ignoretest";
+    private static final String TOPIC = "sling/ignoretest";
+    private static int NUM_JOBS = 10;
+
+    private String queueConfPid;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ignore test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.IGNORE.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+        orderedConfig.update(orderedProps);
+
+        this.queueConfPid = orderedConfig.getPid();
+
+        this.sleep(1000L);
+    }
+
+    @org.junit.Test public void testIgnoreQueue() throws Exception {
+        final AtomicInteger count = new AtomicInteger(0);
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        count.incrementAndGet();
+                        return true;
+                    }
+                });
+
+        try {
+            final JobManager jobManager = this.getJobManager();
+
+            // we start "some" jobs:
+            for(int i = 0; i < NUM_JOBS; i++ ) {
+                jobManager.addJob(TOPIC, null, null);
+            }
+            sleep(200);
+
+            // we wait until NUM_JOBS have been processed by the JobManager
+            while ( jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size() < NUM_JOBS ) {
+                sleep(200);
+             }
+
+            // no jobs queued, none processed but available
+            assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+            assertEquals(0, jobManager.getStatistics().getNumberOfProcessedJobs());
+            assertEquals(0, count.get());
+            assertEquals(NUM_JOBS, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+
+            // let'see if restarting helps with a new queue config
+            final org.osgi.service.cm.Configuration cf = this.configAdmin.getConfiguration(this.queueConfPid, null);
+            final Dictionary<String, Object> orderedProps = cf.getProperties();
+
+            orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+            cf.update(orderedProps);
+
+            // we wait until all jobs are processed
+            while ( count.get() < NUM_JOBS ) {
+                this.sleep(100);
+            }
+
+            // no jobs queued, but processed and not available
+            assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+            assertEquals(NUM_JOBS, jobManager.getStatistics().getNumberOfProcessedJobs());
+            assertEquals(NUM_JOBS, count.get());
+            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+        } finally {
+            jcReg.unregister();
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.impl.Barrier;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class JobHandlingTest extends AbstractJobHandlingTest {
+
+    public static final String TOPIC = "sling/test";
+
+    @Inject
+    protected EventAdmin eventAdmin;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create test queue
+        final org.osgi.service.cm.Configuration config = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(ConfigurationConstants.PROP_NAME, "test");
+        props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+        props.put(ConfigurationConstants.PROP_TOPICS, new String[] {TOPIC, TOPIC + "2"});
+        props.put(ConfigurationConstants.PROP_RETRIES, 2);
+        props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        config.update(props);
+
+        this.sleep(1000L);
+    }
+
+    /**
+     * Helper method to create a job event.
+     */
+    private Event getJobEvent(String id) {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(JobUtil.PROPERTY_JOB_TOPIC, "sling/test");
+        if ( id != null ) {
+            props.put(JobUtil.PROPERTY_JOB_NAME, id);
+        }
+
+        return new Event(JobUtil.TOPIC_JOB, props);
+    }
+
+    /**
+     * Test simple job execution.
+     * The job is executed once and finished successfully.
+     */
+    @Test public void testSimpleJobExecution() throws Exception {
+        final Barrier cb = new Barrier(2);
+
+        final ServiceRegistration reg = this.registerEventHandler("sling/test",
+                new EventHandler() {
+                    @Override
+                    public void handleEvent(Event event) {
+                        JobUtil.acknowledgeJob(event);
+                        JobUtil.finishedJob(event);
+                        cb.block();
+                    }
+
+                 });
+
+        try {
+            this.eventAdmin.sendEvent(getJobEvent(null));
+            assertTrue("No event received in the given time.", cb.block(5));
+            cb.reset();
+            assertFalse("Unexpected event received in the given time.", cb.block(5));
+        } finally {
+            reg.unregister();
+        }
+    }
+
+    @Test public void testManyJobs() throws Exception {
+        final ServiceRegistration reg1 = this.registerEventHandler("sling/test",
+                new EventHandler() {
+                    @Override
+                    public void handleEvent(Event event) {
+                        JobUtil.processJob(event, new JobProcessor() {
+
+                            @Override
+                            public boolean process(Event job) {
+                                try {
+                                    Thread.sleep(200);
+                                } catch (InterruptedException ie) {
+                                    // ignore
+                                }
+                                return true;
+                            }
+                        });
+                    }
+                 });
+        final AtomicInteger count = new AtomicInteger(0);
+        final ServiceRegistration reg2 = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+                    @Override
+                    public void handleEvent(Event event) {
+                        count.incrementAndGet();
+                    }
+                 });
+
+        try {
+            // we start "some" jobs
+            final int COUNT = 300;
+            for(int i = 0; i < COUNT; i++ ) {
+//                final String queueName = "queue" + (i % 20);
+                this.eventAdmin.sendEvent(this.getJobEvent(null));
+            }
+            while ( count.get() < COUNT ) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+            }
+            assertEquals("Finished count", COUNT, count.get());
+            assertEquals("Finished count", COUNT, this.getJobManager().getStatistics().getNumberOfFinishedJobs());
+        } finally {
+            reg1.unregister();
+            reg2.unregister();
+        }
+    }
+
+    /**
+     * Test simple job execution with job id.
+     * The job is executed once and finished successfully.
+     */
+    @org.junit.Test public void testSimpleJobWithIdExecution() throws Exception {
+        final Barrier cb = new Barrier(2);
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        cb.block();
+                        return true;
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+            jobManager.addJob(TOPIC, "myid1", null);
+            assertTrue("No event received in the given time.", cb.block(5));
+            cb.reset();
+            assertFalse("Unexpected event received in the given time.", cb.block(5));
+        } finally {
+            jcReg.unregister();
+        }
+    }
+
+    /**
+     * Test canceling a job
+     * The job execution always fails
+     */
+    @org.junit.Test public void testCancelJob() throws Exception {
+        final Barrier cb = new Barrier(2);
+        final Barrier cb2 = new Barrier(2);
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        cb.block();
+                        cb2.block();
+                        return false;
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+            jobManager.addJob(TOPIC, "myid2", null);
+            cb.block();
+
+            assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+            // job is currently waiting, therefore cancel fails
+            final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+            assertNotNull(e1);
+            assertFalse(jobManager.removeJob((String)e1.getProperty(JobUtil.JOB_ID)));
+            cb2.block(); // and continue job
+
+            sleep(200);
+
+            // the job is now in the queue again
+            final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+            assertNotNull(e2);
+            assertTrue(jobManager.removeJob((String)e2.getProperty(JobUtil.JOB_ID)));
+            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+        } finally {
+            jcReg.unregister();
+        }
+   }
+
+    /**
+     * Test force canceling a job
+     * The job execution always fails
+     */
+    @org.junit.Test public void testForceCancelJob() throws Exception {
+        final Barrier cb = new Barrier(2);
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        cb.block();
+                        sleep(1000);
+                        return false;
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+            jobManager.addJob(TOPIC, "myid3", null);
+            cb.block();
+
+            assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+            // job is currently sleeping, but force cancel always waits!
+            final Event e = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid3"));
+            assertNotNull(e);
+            jobManager.forceRemoveJob((String)e.getProperty(JobUtil.JOB_ID));
+            // the job is now removed
+            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+        } finally {
+            jcReg.unregister();
+        }
+    }
+
+    /**
+     * Reschedule test.
+     * The job is rescheduled two times before it fails.
+     */
+    @org.junit.Test public void testStartJobAndReschedule() throws Exception {
+        final List<Integer> retryCountList = new ArrayList<Integer>();
+        final Barrier cb = new Barrier(2);
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+                    int retryCount;
+
+                    @Override
+                    public boolean process(Job job) {
+                        int retry = 0;
+                        if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                            retry = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+                        }
+                        if ( retry == retryCount ) {
+                            retryCountList.add(retry);
+                        }
+                        retryCount++;
+                        cb.block();
+                        return false;
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+            jobManager.addJob(TOPIC, null, null);
+
+            assertTrue("No event received in the given time.", cb.block(5));
+            cb.reset();
+            // the job is retried after two seconds, so we wait again
+            assertTrue("No event received in the given time.", cb.block(5));
+            cb.reset();
+            // the job is retried after two seconds, so we wait again
+            assertTrue("No event received in the given time.", cb.block(5));
+            // we have reached the retry so we expect to not get an event
+            cb.reset();
+            assertFalse("Unexpected event received in the given time.", cb.block(5));
+            assertEquals("Unexpected number of retries", 3, retryCountList.size());
+        } finally {
+            jcReg.unregister();
+        }
+    }
+
+    /**
+     * Notifications.
+     * We send several jobs which are treated different and then see
+     * how many invocations have been sent.
+     */
+    @org.junit.Test public void testNotifications() throws Exception {
+        final List<String> cancelled = Collections.synchronizedList(new ArrayList<String>());
+        final List<String> failed = Collections.synchronizedList(new ArrayList<String>());
+        final List<String> finished = Collections.synchronizedList(new ArrayList<String>());
+        final List<String> started = Collections.synchronizedList(new ArrayList<String>());
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(Job job) {
+                        // events 1 and 4 finish the first time
+                        final String id = job.getName();
+                        if ( "1".equals(id) || "4".equals(id) ) {
+                            return true;
+
+                        // 5 fails always
+                        } else if ( "5".equals(id) ) {
+                            return false;
+                        } else {
+                            int retry = 0;
+                            if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT) != null ) {
+                                retry = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+                            }
+                            // 2 fails the first time
+                            if ( "2".equals(id) ) {
+                                if ( retry == 0 ) {
+                                    return false;
+                                } else {
+                                    return true;
+                                }
+                            }
+                            // 3 fails the first and second time
+                            if ( "3".equals(id) ) {
+                                if ( retry == 0 || retry == 1 ) {
+                                    return false;
+                                } else {
+                                    return true;
+                                }
+                            }
+                        }
+                        return false;
+                    }
+                });
+        final ServiceRegistration eh1Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_CANCELLED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+                        final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+                        cancelled.add(id);
+                    }
+                });
+        final ServiceRegistration eh2Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_FAILED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+                        final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+                        failed.add(id);
+                    }
+                });
+        final ServiceRegistration eh3Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+                        final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+                        finished.add(id);
+                    }
+                });
+        final ServiceRegistration eh4Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_STARTED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+                        final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+                        started.add(id);
+                    }
+                });
+
+        try {
+            final JobManager jobManager = this.getJobManager();
+            jobManager.addJob(TOPIC, "1", null);
+            jobManager.addJob(TOPIC, "2", null);
+            jobManager.addJob(TOPIC, "3", null);
+            jobManager.addJob(TOPIC, "4", null);
+            jobManager.addJob(TOPIC, "5", null);
+
+            int count = 0;
+            final long startTime = System.currentTimeMillis();
+            do {
+                count = finished.size() + cancelled.size();
+                // after 25 seconds we cancel the test
+                if ( System.currentTimeMillis() - startTime > 25000 ) {
+                    throw new Exception("Timeout during notification test.");
+                }
+            } while ( count < 5 || started.size() < 10 );
+            assertEquals("Finished count", 4, finished.size());
+            assertEquals("Cancelled count", 1, cancelled.size());
+            assertEquals("Started count", 10, started.size());
+            assertEquals("Failed count", 5, failed.size());
+        } finally {
+            jcReg.unregister();
+            eh1Reg.unregister();
+            eh2Reg.unregister();
+            eh3Reg.unregister();
+            eh4Reg.unregister();
+        }
+    }
+
+    /**
+     * Test sending of jobs with and without a processor
+     */
+    @org.junit.Test(timeout=1000*60*5) public void testNoJobProcessor() throws Exception {
+        final AtomicInteger count = new AtomicInteger(0);
+        final AtomicInteger unprocessedCount = new AtomicInteger(0);
+
+        final ServiceRegistration eh1 = this.registerEventHandler(TOPIC,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        JobUtil.processJob(event, new JobProcessor() {
+
+                            @Override
+                            public boolean process(Event job) {
+                                try {
+                                    Thread.sleep(200);
+                                } catch (InterruptedException ie) {
+                                    // ignore
+                                }
+                                return true;
+                            }
+                        });
+                    }
+                });
+        final ServiceRegistration eh2 = this.registerEventHandler("sling/test2",
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        unprocessedCount.incrementAndGet();
+                    }
+                });
+        final ServiceRegistration eh3 = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(Event event) {
+                        count.incrementAndGet();
+                    }
+                });
+        try {
+            final JobManager jobManager = this.getJobManager();
+
+            // we start 20 jobs, every second job has no processor
+            final int COUNT = 20;
+            for(int i = 0; i < COUNT; i++ ) {
+                final String jobTopic = (i % 2 == 0 ? TOPIC : TOPIC + "2");
+                final Dictionary<String, Object> props = new Hashtable<String, Object>();
+                props.put(JobUtil.PROPERTY_JOB_TOPIC, jobTopic);
+
+                this.eventAdmin.postEvent(new Event(JobUtil.TOPIC_JOB, props));
+            }
+            final long startTime = System.currentTimeMillis();
+            while ( count.get() < COUNT / 2) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+            }
+            while ( unprocessedCount.get() < COUNT / 2) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+            }
+            // clean up waits for one minute, so we should do the same
+            while ( System.currentTimeMillis() - startTime < 72000 ) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+            }
+            ((Runnable)jobManager).run();
+            while ( unprocessedCount.get() < COUNT ) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+            }
+            assertEquals("Finished count", COUNT / 2, count.get());
+            assertEquals("Unprocessed count",COUNT, unprocessedCount.get());
+            assertEquals("Finished count", COUNT / 2, jobManager.getStatistics().getNumberOfFinishedJobs());
+        } finally {
+            eh1.unregister();
+            eh2.unregister();
+            eh3.unregister();
+        }
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.Barrier;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class OrderedQueueTest extends AbstractJobHandlingTest {
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ordered test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, "orderedtest");
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, "sling/orderedtest/*");
+        orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+        orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        orderedConfig.update(orderedProps);
+
+        this.sleep(1000L);
+    }
+
+    /**
+     * Ordered Queue Test
+     */
+    @Test public void testOrderedQueue() throws Exception {
+        final JobManager jobManager = this.getJobManager();
+
+        final Barrier cb = new Barrier(2);
+
+        final ServiceRegistration jc1Reg = this.registerJobConsumer("sling/orderedtest/start",
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(final Job job) {
+                        cb.block();
+                        return true;
+                    }
+                });
+
+        // register new consumer and event handle
+        final AtomicInteger count = new AtomicInteger(0);
+        final AtomicInteger parallelCount = new AtomicInteger(0);
+        final ServiceRegistration jcReg = this.registerJobConsumer("sling/orderedtest/*",
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(final Job job) {
+                        if ( parallelCount.incrementAndGet() > 1 ) {
+                            parallelCount.decrementAndGet();
+                            return false;
+                        }
+                        final String topic = job.getTopic();
+                        if ( topic.endsWith("sub1") ) {
+                            final int i = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+                            if ( i == 0 ) {
+                                parallelCount.decrementAndGet();
+                                return false;
+                            }
+                        }
+                        try {
+                            Thread.sleep(30);
+                        } catch (InterruptedException ie) {
+                            // ignore
+                        }
+                        parallelCount.decrementAndGet();
+                        return true;
+                    }
+                });
+        final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(final Event event) {
+                        count.incrementAndGet();
+                    }
+                });
+
+        try {
+            // we first sent one event to get the queue started
+            jobManager.addJob("sling/orderedtest/start", null, null);
+            assertTrue("No event received in the given time.", cb.block(5));
+            cb.reset();
+
+            // get the queue
+            final Queue q = jobManager.getQueue("orderedtest");
+            assertNotNull("Queue 'orderedtest' should exist!", q);
+
+            // suspend it
+            q.suspend();
+
+            final int NUM_JOBS = 30;
+
+            // we start "some" jobs:
+            for(int i = 0; i < NUM_JOBS; i++ ) {
+                final String subTopic = "sling/orderedtest/sub" + (i % 10);
+                jobManager.addJob(subTopic, null, null);
+            }
+            // start the queue
+            q.resume();
+            while ( count.get() < NUM_JOBS +1 ) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+            }
+            // we started one event before the test, so add one
+            assertEquals("Finished count", NUM_JOBS + 1, count.get());
+            assertEquals("Finished count", NUM_JOBS + 1, jobManager.getStatistics().getNumberOfFinishedJobs());
+            assertEquals("Finished count", NUM_JOBS + 1, q.getStatistics().getNumberOfFinishedJobs());
+            assertEquals("Failed count", NUM_JOBS / 10, q.getStatistics().getNumberOfFailedJobs());
+            assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
+        } finally {
+            jc1Reg.unregister();
+            jcReg.unregister();
+            ehReg.unregister();
+        }
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.Barrier;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class RoundRobinQueueTest extends AbstractJobHandlingTest {
+
+    private static final String QUEUE_NAME = "roundrobintest";
+    private static final String TOPIC = "sling/roundrobintest";
+    private static int MAX_PAR = 5;
+    private static int NUM_JOBS = 300;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ordered test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*");
+        orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+        orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR);
+        orderedConfig.update(orderedProps);
+
+        this.sleep(1000L);
+    }
+
+    @org.junit.Test public void testRoundRobinQueue() throws Exception {
+        final JobManager jobManager = this.getJobManager();
+
+        final Barrier cb = new Barrier(2);
+
+        final ServiceRegistration jc1Reg = this.registerJobConsumer(TOPIC + "/start",
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(final Job job) {
+                        cb.block();
+                        return true;
+                    }
+                });
+
+        // register new consumer and event handle
+        final AtomicInteger count = new AtomicInteger(0);
+        final AtomicInteger parallelCount = new AtomicInteger(0);
+        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC + "/*",
+                new JobConsumer() {
+
+                    @Override
+                    public boolean process(final Job job) {
+                        if ( parallelCount.incrementAndGet() > MAX_PAR ) {
+                            parallelCount.decrementAndGet();
+                            return false;
+                        }
+                        sleep(30);
+                        parallelCount.decrementAndGet();
+                        return true;
+                    }
+                });
+        final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(final Event event) {
+                        count.incrementAndGet();
+                    }
+                });
+
+        try {
+            // we first sent one event to get the queue started
+            jobManager.addJob(TOPIC + "/start", null, null);
+            assertTrue("No event received in the given time.", cb.block(5));
+            cb.reset();
+
+            // get the queue
+            final Queue q = jobManager.getQueue(QUEUE_NAME);
+            assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
+
+            // suspend it
+            q.suspend();
+
+            // we start "some" jobs:
+            // first jobs without id
+            for(int i = 0; i < NUM_JOBS; i++ ) {
+                final String subTopic = TOPIC + "/sub" + (i % 10);
+                jobManager.addJob(subTopic, null, null);
+            }
+            // second jobs with id
+            for(int i = 0; i < NUM_JOBS; i++ ) {
+                final String subTopic = TOPIC + "/sub" + (i % 10);
+                jobManager.addJob(subTopic, "id" + i, null);
+            }
+            // start the queue
+            q.resume();
+            while ( count.get() < 2 * NUM_JOBS  + 1 ) {
+                assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+                assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
+                sleep(500);
+            }
+            // we started one event before the test, so add one
+            assertEquals("Finished count", 2 * NUM_JOBS + 1, count.get());
+            assertEquals("Finished count", 2 * NUM_JOBS + 1, jobManager.getStatistics().getNumberOfFinishedJobs());
+            assertEquals("Finished count", 2 * NUM_JOBS + 1, q.getStatistics().getNumberOfFinishedJobs());
+            assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+            assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
+        } finally {
+            jc1Reg.unregister();
+            jcReg.unregister();
+            ehReg.unregister();
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.EventUtil;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class TimedJobsTest extends AbstractJobHandlingTest {
+
+    private static final String TOPIC = "timed/test/topic";
+
+    @Inject
+    private EventAdmin eventAdmin;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        this.sleep(1000L);
+    }
+
+    @org.junit.Test public void testTimedJob() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+
+        final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+            @Override
+            public void handleEvent(final Event event) {
+                if ( TOPIC.equals(event.getTopic()) ) {
+                    counter.incrementAndGet();
+                }
+            }
+        });
+        try {
+            final Date d = new Date();
+            d.setTime(System.currentTimeMillis() + 2000); // run in 2 seconds
+            // send timed event
+            final Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, d);
+            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+            while ( counter.get() == 0 ) {
+                this.sleep(1000);
+            }
+        } finally {
+            ehReg.unregister();
+        }
+    }
+
+    @org.junit.Test public void testPeriodicTimedJob() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+
+        final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+            @Override
+            public void handleEvent(final Event event) {
+                if ( TOPIC.equals(event.getTopic()) ) {
+                    counter.incrementAndGet();
+                }
+            }
+        });
+        try {
+            // send timed event
+            final Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_PERIOD, 1L);
+            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+            while ( counter.get() < 5 ) {
+                this.sleep(1000);
+            }
+            final Dictionary<String, Object> props2 = new Hashtable<String, Object>();
+            props2.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+
+            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props2));
+            int current = counter.get();
+            this.sleep(2000);
+            if ( counter.get() != current && counter.get() != current + 1 ) {
+                fail("Events are still sent");
+            }
+        } finally {
+            ehReg.unregister();
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain