You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/22 13:42:55 UTC
svn commit: r1470462 [7/7] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/dea/
src/main/java/org/apache/sling/event/impl/jobs/ sr...
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.options;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.launchpad.api.StartupHandler;
+import org.apache.sling.launchpad.api.StartupMode;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.Configuration;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+public abstract class AbstractJobHandlingTest {
+
+ private static final String BUNDLE_JAR_SYS_PROP = "project.bundle.file";
+
+ @Inject
+ protected EventAdmin eventAdmin;
+
+ @Inject
+ protected ConfigurationAdmin configAdmin;
+
+ @Inject
+ protected BundleContext bc;
+
+ @Configuration
+ public Option[] config() {
+ final String bundleFileName = System.getProperty( BUNDLE_JAR_SYS_PROP );
+ final File bundleFile = new File( bundleFileName );
+ if ( !bundleFile.canRead() ) {
+ throw new IllegalArgumentException( "Cannot read from bundle file " + bundleFileName + " specified in the "
+ + BUNDLE_JAR_SYS_PROP + " system property" );
+ }
+
+ return options(
+ mavenBundle("org.apache.sling", "org.apache.sling.fragment.xml", "1.0.2"),
+ mavenBundle("org.apache.sling", "org.apache.sling.fragment.transaction", "1.0.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.fragment.activation", "1.0.2"),
+ mavenBundle("org.apache.sling", "org.apache.sling.fragment.ws", "1.0.2"),
+
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.log", "3.0.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.logservice", "1.0.2"),
+
+ mavenBundle("org.slf4j", "slf4j-api", "1.6.4"),
+ mavenBundle("org.slf4j", "jcl-over-slf4j", "1.6.4"),
+ mavenBundle("org.slf4j", "log4j-over-slf4j", "1.6.4"),
+
+ mavenBundle("commons-io", "commons-io", "1.4"),
+ mavenBundle("commons-fileupload", "commons-fileupload", "1.2.2"),
+ mavenBundle("commons-collections", "commons-collections", "3.2.1"),
+ mavenBundle("commons-codec", "commons-codec", "1.6"),
+ mavenBundle("commons-lang", "commons-lang", "2.5"),
+
+ mavenBundle("org.apache.geronimo.bundles", "commons-httpclient", "3.1_1"),
+ mavenBundle("org.apache.tika", "tika-core", "1.0"),
+ mavenBundle("org.apache.tika", "tika-bundle", "1.0"),
+
+ mavenBundle("org.apache.felix", "org.apache.felix.http.jetty", "2.2.0"),
+ mavenBundle("org.apache.felix", "org.apache.felix.eventadmin", "1.2.14"),
+ mavenBundle("org.apache.felix", "org.apache.felix.scr", "1.6.2"),
+ mavenBundle("org.apache.felix", "org.apache.felix.configadmin", "1.6.0"),
+// mavenBundle("org.apache.felix", "org.apache.felix.metatype", "1.0.6"),
+
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.osgi", "2.2.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.json", "2.0.6"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.mime", "2.1.4"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.classloader", "1.3.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.3.4"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.1.0"),
+
+ mavenBundle("org.apache.sling", "org.apache.sling.launchpad.api", "1.1.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.1.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "0.1.0-SNAPSHOT"),
+
+ mavenBundle("org.apache.sling", "org.apache.sling.api", "2.4.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.settings", "1.2.2"),
+ mavenBundle("org.apache.sling", "org.apache.sling.resourceresolver", "1.0.6"),
+ mavenBundle("org.apache.sling", "org.apache.sling.adapter", "2.1.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.jcr.resource", "2.2.6"),
+ mavenBundle("org.apache.sling", "org.apache.sling.jcr.classloader", "3.1.12"),
+ mavenBundle("org.apache.sling", "org.apache.sling.jcr.contentloader", "2.1.2"),
+ mavenBundle("org.apache.sling", "org.apache.sling.engine", "2.2.6"),
+
+ mavenBundle("org.apache.sling", "org.apache.sling.jcr.jcr-wrapper", "2.0.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.jcr.api", "2.1.0"),
+ mavenBundle("org.apache.sling", "org.apache.sling.jcr.base", "2.1.2"),
+ mavenBundle("org.apache.jackrabbit", "jackrabbit-api", "2.4.2"),
+ mavenBundle("org.apache.jackrabbit", "jackrabbit-jcr-commons", "2.4.2"),
+ mavenBundle("org.apache.jackrabbit", "jackrabbit-spi", "2.4.2"),
+ mavenBundle("org.apache.jackrabbit", "jackrabbit-spi-commons", "2.4.2"),
+ mavenBundle("org.apache.jackrabbit", "jackrabbit-jcr-rmi", "2.4.2"),
+ mavenBundle("org.apache.derby", "derby", "10.5.3.0_1"),
+ mavenBundle("org.apache.sling", "org.apache.sling.jcr.jackrabbit.server", "2.1.1-SNAPSHOT"),
+
+ CoreOptions.bundle( bundleFile.toURI().toString() ),
+
+ junitBundles()
+ );
+ }
+
+ protected JobManager getJobManager() {
+ final ServiceReference sr = this.bc.getServiceReference(JobManager.class.getName());
+ return (JobManager)this.bc.getService(sr);
+ }
+
+ protected void sleep(final long time) {
+ try {
+ Thread.sleep(time);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ public void setup() throws IOException {
+ // set load delay to 3 sec
+ final org.osgi.service.cm.Configuration c2 = this.configAdmin.getConfiguration("org.apache.sling.event.impl.jobs.jcr.PersistenceHandler", null);
+ Dictionary<String, Object> p2 = new Hashtable<String, Object>();
+ p2.put(JobManagerConfiguration.CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY, 3L);
+ c2.update(p2);
+
+ final StartupHandler handler = new StartupHandler() {
+
+ @Override
+ public void waitWithStartup(boolean flag) {
+ }
+
+ @Override
+ public boolean isFinished() {
+ return true;
+ }
+
+ @Override
+ public StartupMode getMode() {
+ return StartupMode.INSTALL;
+ }
+ };
+ this.bc.registerService(StartupHandler.class.getName(), handler, null);
+
+ // cluster discovery
+ final org.osgi.service.cm.Configuration c = this.configAdmin.getConfiguration("org.apache.sling.discovery.impl.NoClusterDiscoveryService", null);
+ Dictionary<String, Object> p = new Hashtable<String, Object>();
+ p.put("a", "b");
+ c.update(p);
+ }
+
+ /**
+ * Helper method to register an event handler
+ */
+ protected ServiceRegistration registerEventHandler(final String topic,
+ final EventHandler handler) {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventConstants.EVENT_TOPIC, topic);
+ final ServiceRegistration reg = this.bc.registerService(EventHandler.class.getName(),
+ handler, props);
+ return reg;
+ }
+
+ /**
+ * Helper method to register a job consumer
+ */
+ protected ServiceRegistration registerJobConsumer(final String topic,
+ final JobConsumer handler) {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(JobConsumer.PROPERTY_TOPICS, topic);
+ final ServiceRegistration reg = this.bc.registerService(JobConsumer.class.getName(),
+ handler, props);
+ return reg;
+ }
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class ClassloadingTest extends AbstractJobHandlingTest {
+
+ private static final String QUEUE_NAME = "cltest";
+ private static final String TOPIC = "sling/cltest";
+
+ private String queueConfPid;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create ignore test queue
+ final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+ orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+ orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+ orderedConfig.update(orderedProps);
+
+ this.queueConfPid = orderedConfig.getPid();
+
+ this.sleep(1000L);
+ }
+
+
+ @org.junit.Test public void testSimpleClassloading() throws Exception {
+ final AtomicInteger count = new AtomicInteger(0);
+ final List<Event> finishedEvents = Collections.synchronizedList(new ArrayList<Event>());
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ count.incrementAndGet();
+ return true;
+ }
+ });
+ final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ finishedEvents.add(event);
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+
+ final List<String> list = new ArrayList<String>();
+ list.add("1");
+ list.add("2");
+
+ final EventPropertiesMap map = new EventPropertiesMap();
+ map.put("a", "a1");
+ map.put("b", "b2");
+
+ // we start a single job
+ final Map<String, Object> props = new HashMap<String, Object>();
+ props.put("string", "Hello");
+ props.put("int", new Integer(5));
+ props.put("long", new Long(7));
+ props.put("list", list);
+ props.put("map", map);
+
+ jobManager.addJob(TOPIC, null, props);
+
+ while ( finishedEvents.size() < 1 ) {
+ // we wait a little bit
+ Thread.sleep(100);
+ }
+
+ // no jobs queued, none processed and no available
+ assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+ assertEquals(1, count.get());
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+
+ final String jobTopic = (String)finishedEvents.get(0).getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_TOPIC);
+ assertNotNull(jobTopic);
+ assertEquals("Hello", finishedEvents.get(0).getProperty("string"));
+ assertEquals(new Integer(5), Integer.valueOf(finishedEvents.get(0).getProperty("int").toString()));
+ assertEquals(new Long(7), Long.valueOf(finishedEvents.get(0).getProperty("long").toString()));
+ assertEquals(list, finishedEvents.get(0).getProperty("list"));
+ assertEquals(map, finishedEvents.get(0).getProperty("map"));
+ } finally {
+ jcReg.unregister();
+ ehReg.unregister();
+ }
+ }
+
+ @org.junit.Test public void testFailedClassloading() throws Exception {
+ final AtomicInteger count = new AtomicInteger(0);
+ final List<Event> finishedEvents = Collections.synchronizedList(new ArrayList<Event>());
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC + "/failed",
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ count.incrementAndGet();
+ return true;
+ }
+ });
+ final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ finishedEvents.add(event);
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+
+ // dao is an invisible class for the dynamic class loader as it is not public
+ // therefore scheduling this job should fail!
+ final DataObject dao = new DataObject();
+ dao.message = "Hello World";
+
+ // we start a single job
+ final Map<String, Object> props = new HashMap<String, Object>();
+ props.put("dao", dao);
+
+ jobManager.addJob(TOPIC + "/failed", null, props);
+
+ // we simply wait a little bit
+ sleep(2000);
+
+ assertEquals(0, count.get());
+ assertEquals(0, finishedEvents.size());
+ assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1).size());
+ assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+ assertEquals(0, jobManager.getStatistics().getNumberOfActiveJobs());
+
+ } finally {
+ jcReg.unregister();
+ ehReg.unregister();
+ }
+ }
+
+ private static final class DataObject implements Serializable {
+ public String message;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class DropQueueTest extends AbstractJobHandlingTest {
+
+ private static final String QUEUE_NAME = "droptest";
+ private static final String TOPIC = "sling/droptest";
+ private static int NUM_JOBS = 10;
+
+ private String queueConfPid;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create ignore test queue
+ final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+ orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.DROP.name());
+ orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+ orderedConfig.update(orderedProps);
+
+ this.queueConfPid = orderedConfig.getPid();
+
+ this.sleep(1000L);
+ }
+
+
+ @org.junit.Test public void testDroppingQueue() throws Exception {
+ final AtomicInteger count = new AtomicInteger(0);
+ final AtomicInteger dropCount = new AtomicInteger(0);
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ count.incrementAndGet();
+ return true;
+ }
+ });
+ final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_CANCELLED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ dropCount.incrementAndGet();
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+
+ // we start "some" jobs:
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ jobManager.addJob(TOPIC, null, null);
+ }
+ while ( dropCount.get() < NUM_JOBS ) {
+ // we wait a little bit
+ Thread.sleep(100);
+ }
+
+ // no jobs queued, none processed and no available
+ assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+ assertEquals(0, count.get());
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+
+ // let'see if restarting helps with a new queue config
+ final org.osgi.service.cm.Configuration cf = this.configAdmin.getConfiguration(this.queueConfPid, null);
+ final Dictionary<String, Object> orderedProps = cf.getProperties();
+
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+ cf.update(orderedProps);
+
+ jobManager.restart();
+ // we wait a little bit
+ Thread.sleep(1200); // TODO - we have to wait until reload is done
+
+ // no jobs queued, none processed and no available
+ assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+ assertEquals(0, count.get());
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+ } finally {
+ jcReg.unregister();
+ ehReg.unregister();
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class IgnoreQueueTest extends AbstractJobHandlingTest {
+
+ private static final String QUEUE_NAME = "ignoretest";
+ private static final String TOPIC = "sling/ignoretest";
+ private static int NUM_JOBS = 10;
+
+ private String queueConfPid;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create ignore test queue
+ final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+ orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.IGNORE.name());
+ orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+ orderedConfig.update(orderedProps);
+
+ this.queueConfPid = orderedConfig.getPid();
+
+ this.sleep(1000L);
+ }
+
+ @org.junit.Test public void testIgnoreQueue() throws Exception {
+ final AtomicInteger count = new AtomicInteger(0);
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ count.incrementAndGet();
+ return true;
+ }
+ });
+
+ try {
+ final JobManager jobManager = this.getJobManager();
+
+ // we start "some" jobs:
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ jobManager.addJob(TOPIC, null, null);
+ }
+ sleep(200);
+
+ // we wait until NUM_JOBS have been processed by the JobManager
+ while ( jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size() < NUM_JOBS ) {
+ sleep(200);
+ }
+
+ // no jobs queued, none processed but available
+ assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+ assertEquals(0, jobManager.getStatistics().getNumberOfProcessedJobs());
+ assertEquals(0, count.get());
+ assertEquals(NUM_JOBS, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+
+ // let'see if restarting helps with a new queue config
+ final org.osgi.service.cm.Configuration cf = this.configAdmin.getConfiguration(this.queueConfPid, null);
+ final Dictionary<String, Object> orderedProps = cf.getProperties();
+
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+ cf.update(orderedProps);
+
+ // we wait until all jobs are processed
+ while ( count.get() < NUM_JOBS ) {
+ this.sleep(100);
+ }
+
+ // no jobs queued, but processed and not available
+ assertEquals(0, jobManager.getStatistics().getNumberOfQueuedJobs());
+ assertEquals(NUM_JOBS, jobManager.getStatistics().getNumberOfProcessedJobs());
+ assertEquals(NUM_JOBS, count.get());
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1).size());
+ } finally {
+ jcReg.unregister();
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.impl.Barrier;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class JobHandlingTest extends AbstractJobHandlingTest {
+
+ public static final String TOPIC = "sling/test";
+
+ @Inject
+ protected EventAdmin eventAdmin;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create test queue
+ final org.osgi.service.cm.Configuration config = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(ConfigurationConstants.PROP_NAME, "test");
+ props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+ props.put(ConfigurationConstants.PROP_TOPICS, new String[] {TOPIC, TOPIC + "2"});
+ props.put(ConfigurationConstants.PROP_RETRIES, 2);
+ props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+ config.update(props);
+
+ this.sleep(1000L);
+ }
+
+ /**
+ * Helper method to create a job event.
+ */
+ private Event getJobEvent(String id) {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(JobUtil.PROPERTY_JOB_TOPIC, "sling/test");
+ if ( id != null ) {
+ props.put(JobUtil.PROPERTY_JOB_NAME, id);
+ }
+
+ return new Event(JobUtil.TOPIC_JOB, props);
+ }
+
+ /**
+ * Test simple job execution.
+ * The job is executed once and finished successfully.
+ */
+ @Test public void testSimpleJobExecution() throws Exception {
+ final Barrier cb = new Barrier(2);
+
+ final ServiceRegistration reg = this.registerEventHandler("sling/test",
+ new EventHandler() {
+ @Override
+ public void handleEvent(Event event) {
+ JobUtil.acknowledgeJob(event);
+ JobUtil.finishedJob(event);
+ cb.block();
+ }
+
+ });
+
+ try {
+ this.eventAdmin.sendEvent(getJobEvent(null));
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ } finally {
+ reg.unregister();
+ }
+ }
+
+ @Test public void testManyJobs() throws Exception {
+ final ServiceRegistration reg1 = this.registerEventHandler("sling/test",
+ new EventHandler() {
+ @Override
+ public void handleEvent(Event event) {
+ JobUtil.processJob(event, new JobProcessor() {
+
+ @Override
+ public boolean process(Event job) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ return true;
+ }
+ });
+ }
+ });
+ final AtomicInteger count = new AtomicInteger(0);
+ final ServiceRegistration reg2 = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+ @Override
+ public void handleEvent(Event event) {
+ count.incrementAndGet();
+ }
+ });
+
+ try {
+ // we start "some" jobs
+ final int COUNT = 300;
+ for(int i = 0; i < COUNT; i++ ) {
+// final String queueName = "queue" + (i % 20);
+ this.eventAdmin.sendEvent(this.getJobEvent(null));
+ }
+ while ( count.get() < COUNT ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+ assertEquals("Finished count", COUNT, count.get());
+ assertEquals("Finished count", COUNT, this.getJobManager().getStatistics().getNumberOfFinishedJobs());
+ } finally {
+ reg1.unregister();
+ reg2.unregister();
+ }
+ }
+
+ /**
+ * Test simple job execution with job id.
+ * The job is executed once and finished successfully.
+ */
+ @org.junit.Test public void testSimpleJobWithIdExecution() throws Exception {
+ final Barrier cb = new Barrier(2);
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ cb.block();
+ return true;
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+ jobManager.addJob(TOPIC, "myid1", null);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ } finally {
+ jcReg.unregister();
+ }
+ }
+
+ /**
+ * Test canceling a job
+ * The job execution always fails
+ */
+ @org.junit.Test public void testCancelJob() throws Exception {
+ final Barrier cb = new Barrier(2);
+ final Barrier cb2 = new Barrier(2);
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ cb.block();
+ cb2.block();
+ return false;
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+ jobManager.addJob(TOPIC, "myid2", null);
+ cb.block();
+
+ assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+ // job is currently waiting, therefore cancel fails
+ final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+ assertNotNull(e1);
+ assertFalse(jobManager.removeJob((String)e1.getProperty(JobUtil.JOB_ID)));
+ cb2.block(); // and continue job
+
+ sleep(200);
+
+ // the job is now in the queue again
+ final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+ assertNotNull(e2);
+ assertTrue(jobManager.removeJob((String)e2.getProperty(JobUtil.JOB_ID)));
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+ } finally {
+ jcReg.unregister();
+ }
+ }
+
+ /**
+ * Test force canceling a job
+ * The job execution always fails
+ */
+ @org.junit.Test public void testForceCancelJob() throws Exception {
+ final Barrier cb = new Barrier(2);
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ cb.block();
+ sleep(1000);
+ return false;
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+ jobManager.addJob(TOPIC, "myid3", null);
+ cb.block();
+
+ assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+ // job is currently sleeping, but force cancel always waits!
+ final Event e = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid3"));
+ assertNotNull(e);
+ jobManager.forceRemoveJob((String)e.getProperty(JobUtil.JOB_ID));
+ // the job is now removed
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1).size());
+ } finally {
+ jcReg.unregister();
+ }
+ }
+
+ /**
+ * Reschedule test.
+ * The job is rescheduled two times before it fails.
+ */
+ @org.junit.Test public void testStartJobAndReschedule() throws Exception {
+ final List<Integer> retryCountList = new ArrayList<Integer>();
+ final Barrier cb = new Barrier(2);
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+ int retryCount;
+
+ @Override
+ public boolean process(Job job) {
+ int retry = 0;
+ if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retry = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+ }
+ if ( retry == retryCount ) {
+ retryCountList.add(retry);
+ }
+ retryCount++;
+ cb.block();
+ return false;
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+ jobManager.addJob(TOPIC, null, null);
+
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ // we have reached the retry so we expect to not get an event
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ assertEquals("Unexpected number of retries", 3, retryCountList.size());
+ } finally {
+ jcReg.unregister();
+ }
+ }
+
+ /**
+ * Notifications.
+ * We send several jobs which are treated different and then see
+ * how many invocations have been sent.
+ */
+ @org.junit.Test public void testNotifications() throws Exception {
+ final List<String> cancelled = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> failed = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> finished = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> started = Collections.synchronizedList(new ArrayList<String>());
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ new JobConsumer() {
+
+ @Override
+ public boolean process(Job job) {
+ // events 1 and 4 finish the first time
+ final String id = job.getName();
+ if ( "1".equals(id) || "4".equals(id) ) {
+ return true;
+
+ // 5 fails always
+ } else if ( "5".equals(id) ) {
+ return false;
+ } else {
+ int retry = 0;
+ if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retry = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+ }
+ // 2 fails the first time
+ if ( "2".equals(id) ) {
+ if ( retry == 0 ) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ // 3 fails the first and second time
+ if ( "3".equals(id) ) {
+ if ( retry == 0 || retry == 1 ) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ });
+ final ServiceRegistration eh1Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_CANCELLED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+ cancelled.add(id);
+ }
+ });
+ final ServiceRegistration eh2Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_FAILED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+ failed.add(id);
+ }
+ });
+ final ServiceRegistration eh3Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+ finished.add(id);
+ }
+ });
+ final ServiceRegistration eh4Reg = this.registerEventHandler(JobUtil.TOPIC_JOB_STARTED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ final Event job = (Event) event.getProperty(JobUtil.PROPERTY_NOTIFICATION_JOB);
+ final String id = (String)job.getProperty(JobUtil.PROPERTY_JOB_NAME);
+ started.add(id);
+ }
+ });
+
+ try {
+ final JobManager jobManager = this.getJobManager();
+ jobManager.addJob(TOPIC, "1", null);
+ jobManager.addJob(TOPIC, "2", null);
+ jobManager.addJob(TOPIC, "3", null);
+ jobManager.addJob(TOPIC, "4", null);
+ jobManager.addJob(TOPIC, "5", null);
+
+ int count = 0;
+ final long startTime = System.currentTimeMillis();
+ do {
+ count = finished.size() + cancelled.size();
+ // after 25 seconds we cancel the test
+ if ( System.currentTimeMillis() - startTime > 25000 ) {
+ throw new Exception("Timeout during notification test.");
+ }
+ } while ( count < 5 || started.size() < 10 );
+ assertEquals("Finished count", 4, finished.size());
+ assertEquals("Cancelled count", 1, cancelled.size());
+ assertEquals("Started count", 10, started.size());
+ assertEquals("Failed count", 5, failed.size());
+ } finally {
+ jcReg.unregister();
+ eh1Reg.unregister();
+ eh2Reg.unregister();
+ eh3Reg.unregister();
+ eh4Reg.unregister();
+ }
+ }
+
+ /**
+ * Test sending of jobs with and without a processor
+ */
+ @org.junit.Test(timeout=1000*60*5) public void testNoJobProcessor() throws Exception {
+ final AtomicInteger count = new AtomicInteger(0);
+ final AtomicInteger unprocessedCount = new AtomicInteger(0);
+
+ final ServiceRegistration eh1 = this.registerEventHandler(TOPIC,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ JobUtil.processJob(event, new JobProcessor() {
+
+ @Override
+ public boolean process(Event job) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ return true;
+ }
+ });
+ }
+ });
+ final ServiceRegistration eh2 = this.registerEventHandler("sling/test2",
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ unprocessedCount.incrementAndGet();
+ }
+ });
+ final ServiceRegistration eh3 = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(Event event) {
+ count.incrementAndGet();
+ }
+ });
+ try {
+ final JobManager jobManager = this.getJobManager();
+
+ // we start 20 jobs, every second job has no processor
+ final int COUNT = 20;
+ for(int i = 0; i < COUNT; i++ ) {
+ final String jobTopic = (i % 2 == 0 ? TOPIC : TOPIC + "2");
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(JobUtil.PROPERTY_JOB_TOPIC, jobTopic);
+
+ this.eventAdmin.postEvent(new Event(JobUtil.TOPIC_JOB, props));
+ }
+ final long startTime = System.currentTimeMillis();
+ while ( count.get() < COUNT / 2) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+ while ( unprocessedCount.get() < COUNT / 2) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+ // clean up waits for one minute, so we should do the same
+ while ( System.currentTimeMillis() - startTime < 72000 ) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+ ((Runnable)jobManager).run();
+ while ( unprocessedCount.get() < COUNT ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+ assertEquals("Finished count", COUNT / 2, count.get());
+ assertEquals("Unprocessed count",COUNT, unprocessedCount.get());
+ assertEquals("Finished count", COUNT / 2, jobManager.getStatistics().getNumberOfFinishedJobs());
+ } finally {
+ eh1.unregister();
+ eh2.unregister();
+ eh3.unregister();
+ }
+ }
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.Barrier;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class OrderedQueueTest extends AbstractJobHandlingTest {
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create ordered test queue
+ final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+ orderedProps.put(ConfigurationConstants.PROP_NAME, "orderedtest");
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+ orderedProps.put(ConfigurationConstants.PROP_TOPICS, "sling/orderedtest/*");
+ orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+ orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+ orderedConfig.update(orderedProps);
+
+ this.sleep(1000L);
+ }
+
+ /**
+ * Ordered Queue Test
+ */
+ @Test public void testOrderedQueue() throws Exception {
+ final JobManager jobManager = this.getJobManager();
+
+ final Barrier cb = new Barrier(2);
+
+ final ServiceRegistration jc1Reg = this.registerJobConsumer("sling/orderedtest/start",
+ new JobConsumer() {
+
+ @Override
+ public boolean process(final Job job) {
+ cb.block();
+ return true;
+ }
+ });
+
+ // register new consumer and event handle
+ final AtomicInteger count = new AtomicInteger(0);
+ final AtomicInteger parallelCount = new AtomicInteger(0);
+ final ServiceRegistration jcReg = this.registerJobConsumer("sling/orderedtest/*",
+ new JobConsumer() {
+
+ @Override
+ public boolean process(final Job job) {
+ if ( parallelCount.incrementAndGet() > 1 ) {
+ parallelCount.decrementAndGet();
+ return false;
+ }
+ final String topic = job.getTopic();
+ if ( topic.endsWith("sub1") ) {
+ final int i = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+ if ( i == 0 ) {
+ parallelCount.decrementAndGet();
+ return false;
+ }
+ }
+ try {
+ Thread.sleep(30);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ parallelCount.decrementAndGet();
+ return true;
+ }
+ });
+ final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ count.incrementAndGet();
+ }
+ });
+
+ try {
+ // we first sent one event to get the queue started
+ jobManager.addJob("sling/orderedtest/start", null, null);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+
+ // get the queue
+ final Queue q = jobManager.getQueue("orderedtest");
+ assertNotNull("Queue 'orderedtest' should exist!", q);
+
+ // suspend it
+ q.suspend();
+
+ final int NUM_JOBS = 30;
+
+ // we start "some" jobs:
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ final String subTopic = "sling/orderedtest/sub" + (i % 10);
+ jobManager.addJob(subTopic, null, null);
+ }
+ // start the queue
+ q.resume();
+ while ( count.get() < NUM_JOBS +1 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+ // we started one event before the test, so add one
+ assertEquals("Finished count", NUM_JOBS + 1, count.get());
+ assertEquals("Finished count", NUM_JOBS + 1, jobManager.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Finished count", NUM_JOBS + 1, q.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Failed count", NUM_JOBS / 10, q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
+ } finally {
+ jc1Reg.unregister();
+ jcReg.unregister();
+ ehReg.unregister();
+ }
+ }
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.Barrier;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class RoundRobinQueueTest extends AbstractJobHandlingTest {
+
+ private static final String QUEUE_NAME = "roundrobintest";
+ private static final String TOPIC = "sling/roundrobintest";
+ private static int MAX_PAR = 5;
+ private static int NUM_JOBS = 300;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create ordered test queue
+ final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+ orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name());
+ orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*");
+ orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+ orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+ orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR);
+ orderedConfig.update(orderedProps);
+
+ this.sleep(1000L);
+ }
+
+ @org.junit.Test public void testRoundRobinQueue() throws Exception {
+ final JobManager jobManager = this.getJobManager();
+
+ final Barrier cb = new Barrier(2);
+
+ final ServiceRegistration jc1Reg = this.registerJobConsumer(TOPIC + "/start",
+ new JobConsumer() {
+
+ @Override
+ public boolean process(final Job job) {
+ cb.block();
+ return true;
+ }
+ });
+
+ // register new consumer and event handle
+ final AtomicInteger count = new AtomicInteger(0);
+ final AtomicInteger parallelCount = new AtomicInteger(0);
+ final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC + "/*",
+ new JobConsumer() {
+
+ @Override
+ public boolean process(final Job job) {
+ if ( parallelCount.incrementAndGet() > MAX_PAR ) {
+ parallelCount.decrementAndGet();
+ return false;
+ }
+ sleep(30);
+ parallelCount.decrementAndGet();
+ return true;
+ }
+ });
+ final ServiceRegistration ehReg = this.registerEventHandler(JobUtil.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ count.incrementAndGet();
+ }
+ });
+
+ try {
+ // we first sent one event to get the queue started
+ jobManager.addJob(TOPIC + "/start", null, null);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+
+ // get the queue
+ final Queue q = jobManager.getQueue(QUEUE_NAME);
+ assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
+
+ // suspend it
+ q.suspend();
+
+ // we start "some" jobs:
+ // first jobs without id
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ final String subTopic = TOPIC + "/sub" + (i % 10);
+ jobManager.addJob(subTopic, null, null);
+ }
+ // second jobs with id
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ final String subTopic = TOPIC + "/sub" + (i % 10);
+ jobManager.addJob(subTopic, "id" + i, null);
+ }
+ // start the queue
+ q.resume();
+ while ( count.get() < 2 * NUM_JOBS + 1 ) {
+ assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
+ sleep(500);
+ }
+ // we started one event before the test, so add one
+ assertEquals("Finished count", 2 * NUM_JOBS + 1, count.get());
+ assertEquals("Finished count", 2 * NUM_JOBS + 1, jobManager.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Finished count", 2 * NUM_JOBS + 1, q.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
+ } finally {
+ jc1Reg.unregister();
+ jcReg.unregister();
+ ehReg.unregister();
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.EventUtil;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.ExamReactorStrategy;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactorFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JUnit4TestRunner.class)
+@ExamReactorStrategy(AllConfinedStagedReactorFactory.class)
+public class TimedJobsTest extends AbstractJobHandlingTest {
+
+ private static final String TOPIC = "timed/test/topic";
+
+ @Inject
+ private EventAdmin eventAdmin;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ this.sleep(1000L);
+ }
+
+ @org.junit.Test public void testTimedJob() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+
+ final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ if ( TOPIC.equals(event.getTopic()) ) {
+ counter.incrementAndGet();
+ }
+ }
+ });
+ try {
+ final Date d = new Date();
+ d.setTime(System.currentTimeMillis() + 2000); // run in 2 seconds
+ // send timed event
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, d);
+ this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+ while ( counter.get() == 0 ) {
+ this.sleep(1000);
+ }
+ } finally {
+ ehReg.unregister();
+ }
+ }
+
+ @org.junit.Test public void testPeriodicTimedJob() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+
+ final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ if ( TOPIC.equals(event.getTopic()) ) {
+ counter.incrementAndGet();
+ }
+ }
+ });
+ try {
+ // send timed event
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_PERIOD, 1L);
+ this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+ while ( counter.get() < 5 ) {
+ this.sleep(1000);
+ }
+ final Dictionary<String, Object> props2 = new Hashtable<String, Object>();
+ props2.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+
+ this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props2));
+ int current = counter.get();
+ this.sleep(2000);
+ if ( counter.get() != current && counter.get() != current + 1 ) {
+ fail("Events are still sent");
+ }
+ } finally {
+ ehReg.unregister();
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain