You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/15 11:42:27 UTC
svn commit: r1022870 - in /sling/branches/eventing-3.0/src:
main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java
Author: cziegeler
Date: Fri Oct 15 09:42:26 2010
New Revision: 1022870
URL: http://svn.apache.org/viewvc?rev=1022870&view=rev
Log:
Fix some NPE's on shutdown and enhance test
Modified:
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1022870&r1=1022869&r2=1022870&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Fri Oct 15 09:42:26 2010
@@ -887,6 +887,9 @@ public class PersistenceHandler implemen
public boolean lock(final JobEvent info) {
final String path = this.getNodePath(info.uniqueId);
synchronized ( this.backgroundLock ) {
+ if ( !this.running ) {
+ return false;
+ }
try {
// check if the node still exists
if ( this.backgroundSession.itemExists(path)
@@ -918,6 +921,9 @@ public class PersistenceHandler implemen
public void unlock(final JobEvent info) {
final String path = this.getNodePath(info.uniqueId);
synchronized ( this.backgroundLock ) {
+ if ( !this.running ) {
+ return;
+ }
try {
this.backgroundSession.getWorkspace().getLockManager().unlock(path);
} catch (RepositoryException re) {
@@ -934,6 +940,9 @@ public class PersistenceHandler implemen
final String jobId = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_NAME);
final String path = this.getNodePath(info.uniqueId);
synchronized ( this.backgroundLock ) {
+ if ( !this.running ) {
+ return;
+ }
try {
if ( this.backgroundSession.itemExists(path) ) {
((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java?rev=1022870&r1=1022869&r2=1022870&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java (original)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java Fri Oct 15 09:42:26 2010
@@ -50,6 +50,8 @@ public class RoundRobinQueueTest extends
private static final String QUEUE_NAME = "roundrobintest";
private static final String TOPIC = "sling/test";
+ private static int MAX_PAR = 5;
+ private static int NUM_JOBS = 300;
protected Mockery context;
@@ -77,7 +79,7 @@ public class RoundRobinQueueTest extends
queueProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*");
queueProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
queueProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN);
- queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 5);
+ queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR);
final InternalQueueConfiguration mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
return new QueueConfigurationManager() {
@@ -92,12 +94,22 @@ public class RoundRobinQueueTest extends
/**
* Helper method to create a job event.
*/
- private Event getJobEvent(final String subTopic) {
+ private Event getJobEvent(final String subTopic, final String id) {
final Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(JobUtil.PROPERTY_JOB_TOPIC, TOPIC + '/' + subTopic);
+ if ( id != null ) {
+ props.put(JobUtil.PROPERTY_JOB_NAME, id);
+ }
return new Event(JobUtil.TOPIC_JOB, props);
}
+ /**
+ * Helper method to create a job event.
+ */
+ private Event getJobEvent(final String subTopic) {
+ return this.getJobEvent(subTopic, null);
+ }
+
@org.junit.Test public void testRoundRobinQueue() throws Exception {
final PersistenceHandler jeh = this.handler;
@@ -125,6 +137,7 @@ public class RoundRobinQueueTest extends
q.suspend();
// set new event admin
final AtomicInteger count = new AtomicInteger(0);
+ final AtomicInteger parallelCount = new AtomicInteger(0);
setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC + '*',
JobUtil.TOPIC_JOB_FINISHED},
new EventHandler[] {
@@ -133,11 +146,15 @@ public class RoundRobinQueueTest extends
JobUtil.processJob(event, new JobProcessor() {
public boolean process(Event job) {
+ if ( parallelCount.incrementAndGet() > MAX_PAR ) {
+ return false;
+ }
try {
Thread.sleep(30);
} catch (InterruptedException ie) {
// ignore
}
+ parallelCount.decrementAndGet();
return true;
}
});
@@ -148,24 +165,33 @@ public class RoundRobinQueueTest extends
count.incrementAndGet();
}
}}));
- // we start "some" jobs
- final int COUNT = 300;
- for(int i = 0; i < COUNT; i++ ) {
+ // we start "some" jobs:
+ // first jobs without id
+ for(int i = 0; i < NUM_JOBS; i++ ) {
final String subTopic = "sub" + (i % 10);
jeh.handleEvent(getJobEvent(subTopic));
}
+ // second jobs with id
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ final String subTopic = "sub" + (i % 10);
+ jeh.handleEvent(getJobEvent(subTopic, "id" + i));
+ }
// start the queue
q.resume();
- while ( count.get() < COUNT ) {
+ while ( count.get() < 2 * NUM_JOBS ) {
+ assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
// ignore
}
}
- assertEquals("Finished count", COUNT, count.get());
+ assertEquals("Finished count", 2 * NUM_JOBS, count.get());
// we started one event before the test, so add one
- assertEquals("Finished count", COUNT + 1, this.jobManager.getStatistics().getNumberOfFinishedJobs());
- assertEquals("Finished count", COUNT + 1, q.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Finished count", 2 * NUM_JOBS + 1, this.jobManager.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Finished count", 2 * NUM_JOBS + 1, q.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Failed count", 0, q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
}
}