You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/19 17:14:26 UTC
svn commit: r911855 - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/job/
src/test/java/org/apache/sling/event/impl/
Author: cziegeler
Date: Fri Feb 19 16:14:26 2010
New Revision: 911855
URL: http://svn.apache.org/viewvc?rev=911855&view=rev
Log:
SLING-1389 : Background loading might not load all stored jobs
SLING-1390 : Deadlock in rare circumstances
SLING-1396 : Include only used classes from jcr commons
This is mostly a refactoring affort to reduce the complexity and make the code more understandable. With this I could fix the above mentioned errors.
Started to add some basic test cases.
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java (with props)
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Fri Feb 19 16:14:26 2010
@@ -54,6 +54,11 @@
<extensions>true</extensions>
<configuration>
<instructions>
+ <!-- As we're using JCR 2.0 for testing we explicitly have to
+ import version 1.0 here -->
+ <Import-Package>
+ javax.jcr.*;version=1.0,*
+ </Import-Package>
<Export-Package>
org.apache.sling.event;version=2.2.0
</Export-Package>
@@ -68,7 +73,7 @@
slingevent=http://sling.apache.org/jcr/event/1.0
</Sling-Namespaces>
<Embed-Dependency>
- jackrabbit-jcr-commons
+ jackrabbit-jcr-commons;inline="org/apache/jackrabbit/util/ISO9075.*|org/apache/jackrabbit/name/ISO8601.*|org/apache/jackrabbit/util/XMLChar.*"
</Embed-Dependency>
</instructions>
</configuration>
@@ -103,9 +108,12 @@
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
+ <!-- We use version 2.0 for testing! -->
<dependency>
<groupId>javax.jcr</groupId>
<artifactId>jcr</artifactId>
+ <version>2.0</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
@@ -156,7 +164,7 @@
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-jcr-commons</artifactId>
- <version>1.4.2</version>
+ <version>2.0.0</version>
<scope>provided</scope>
</dependency>
<!-- Testing -->
@@ -169,9 +177,13 @@
<artifactId>jmock-junit4</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.sling</groupId>
- <artifactId>org.apache.sling.commons.testing</artifactId>
- <version>2.0.4-incubator</version>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-core</artifactId>
+ <version>2.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Fri Feb 19 16:14:26 2010
@@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import javax.jcr.Item;
@@ -57,6 +56,7 @@
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
import org.apache.sling.event.impl.job.JobBlockingQueue;
+import org.apache.sling.event.impl.job.JobUtil;
import org.apache.sling.event.impl.job.ParallelInfo;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentConstants;
@@ -145,7 +145,7 @@
private long maximumParallelJobs;
/** Background session. */
- private Session backgroundSession;
+ protected Session backgroundSession;
/** Unloaded jobs. */
private Set<String>unloadedJobs = new HashSet<String>();
@@ -281,12 +281,14 @@
} catch (InterruptedException e) {
this.ignoreException(e);
}
- this.logger.debug("Stopped job queue {}", jbq.getName());
+ this.logger.info("Stopped job queue {}", jbq.getName());
}
}
if ( this.backgroundSession != null ) {
synchronized ( this.backgroundLock ) {
this.logger.debug("Shutting down background session.");
+ // notify possibly sleeping thread
+ this.backgroundLock.notify();
try {
this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
} catch (RepositoryException e) {
@@ -474,7 +476,7 @@
info.event = event;
final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- final String nodePath = this.getNodePath(jobTopic, jobId);
+ final String nodePath = JobUtil.getUniquePath(jobTopic, jobId);
// if the job has no job id, we can just write the job to the repo and don't
// need locking
@@ -537,14 +539,88 @@
// if we were able to write the event into the repository
// we will queue it for processing
if ( info.nodePath != null ) {
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
+ this.queueJob(info);
+ }
+ }
+ }
+ }
+
+ /**
+ * Put the job into the correct queue.
+ */
+ private void queueJob(final EventInfo info) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Received new job {}", EventUtil.toString(info.event));
+ }
+ // check for local only jobs and remove them from the queue if they're meant
+ // for another application node
+ final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
+ if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
+ && appId != null && !this.applicationId.equals(appId) ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(info.event));
+ }
+ } else {
+
+ // check if we should put this into a separate queue
+ boolean queued = false;
+ if ( info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
+ synchronized ( this.jobQueues ) {
+ BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
+ if ( jobQueue == null ) {
+ // check if we have exceeded the maximum number of job queues
+ if ( this.jobQueues.size() >= this.maxJobQueues ) {
+ this.logger.warn("Unable to create new job queue named {} as there are already {} job queues." +
+ " Try to increase the maximum number of job queues!", queueName, this.jobQueues.size());
+ } else {
+ final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+ final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
+ jobQueue = jq;
+ this.jobQueues.put(queueName, jq);
+ // Start background thread
+ this.threadPool.execute(new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ while ( running && !jq.isFinished() ) {
+ logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
+ try {
+ runJobQueue(queueName, jq);
+ } catch (Throwable t) {
+ logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
+ }
+ }
+ }
+
+ });
+ }
+ }
+ if ( jobQueue != null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
+ }
+ try {
+ jobQueue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ // don't put into main queue
+ queued = true;
}
}
}
+ if ( !queued ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
}
}
@@ -552,6 +628,7 @@
* This method runs in the background and processes the local queue.
*/
protected void runInBackground() throws RepositoryException {
+ // create the background session and register a listener
this.backgroundSession = this.createSession();
this.backgroundSession.getWorkspace().getObservationManager()
.addEventListener(this,
@@ -562,7 +639,6 @@
null,
new String[] {this.getEventNodeType()},
true);
- // load unprocessed jobs from repository
if ( this.running ) {
logger.info("Apache Sling Job Event Handler started.");
logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}," +
@@ -578,6 +654,7 @@
ctx.disableComponent(name);
}
}
+ // This is the main queue
while ( this.running ) {
// so let's wait/get the next job from the queue
EventInfo info = null;
@@ -589,76 +666,8 @@
}
if ( info != null && this.running ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Received new job {}", EventUtil.toString(info.event));
- }
- // check for local only jobs and remove them from the queue if they're meant
- // for another application node
- final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
- if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
- && appId != null && !this.applicationId.equals(appId) ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(info.event));
- }
- info = null;
- }
-
- // check if we should put this into a separate queue
- if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
- final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
- synchronized ( this.jobQueues ) {
- BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
- if ( jobQueue == null ) {
- // check if we have exceeded the maximum number of job queues
- if ( this.jobQueues.size() >= this.maxJobQueues ) {
- this.logger.warn("Unable to create new job queue named {} as there are already {} job queues." +
- " Try to increase the maximum number of job queues!", queueName, this.jobQueues.size());
- } else {
- final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
- final JobBlockingQueue jq = new JobBlockingQueue(queueName, orderedQueue, this.logger);
- jobQueue = jq;
- this.jobQueues.put(queueName, jq);
- // Start background thread
- this.threadPool.execute(new Runnable() {
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run() {
- while ( running && !jq.isFinished() ) {
- logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName);
- try {
- runJobQueue(queueName, jq);
- } catch (Throwable t) {
- logger.error("Job queue stopped with exception: " + t.getMessage() + ". Restarting.", t);
- }
- }
- }
-
- });
- }
- }
- if ( jobQueue != null ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName);
- }
- try {
- jobQueue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
- }
- // don't process this here
- info = null;
- }
- }
- }
-
- // if we still have a job, process it
- if ( info != null ) {
- if ( this.executeJob(info, null) == Status.RESCHEDULE ) {
- this.putBackIntoMainQueue(info, true);
- }
+ if ( this.executeJob(info, null) == Status.RESCHEDULE ) {
+ this.putBackIntoMainQueue(info, true);
}
}
}
@@ -719,6 +728,61 @@
}
}
+ /**
+ * Check the precondition for the job.
+ * This method handles the parallel settings and returns <code>true</code>
+ * if the job can be run. If <code>false</code> is returned, we have to
+ * wait for another job to finish first.
+ */
+ private boolean checkPrecondition(final ParallelInfo parInfo, final String jobTopic) {
+ // check how we can process this job:
+ // if the job should not be processed in parallel, we have to check
+ // if another job with the same topic is currently running
+ // if parallel processing is allowed, we have to check for the number
+ // of max allowed parallel jobs for this topic
+ boolean process = parInfo.processParallel;
+ if ( !parInfo.processParallel ) {
+ synchronized ( this.processingMap ) {
+ final Boolean value = this.processingMap.get(jobTopic);
+ if ( value == null || !value.booleanValue() ) {
+ this.processingMap.put(jobTopic, Boolean.TRUE);
+ process = true;
+ }
+ }
+ } else {
+ if ( parInfo.maxParallelJob > 1 ) {
+ synchronized ( this.parallelProcessingMap ) {
+ final Integer value = this.parallelProcessingMap.get(jobTopic);
+ final int currentValue = (value == null ? 0 : value.intValue());
+ if ( currentValue < parInfo.maxParallelJob ) {
+ this.parallelProcessingMap.put(jobTopic, currentValue + 1);
+ } else {
+ process = false;
+ }
+ }
+ }
+ }
+ return process;
+ }
+
+ /**
+ * Unlock the parallel job processing state.
+ */
+ private void unlockState(final ParallelInfo parInfo, final String jobTopic) {
+ if ( !parInfo.processParallel ) {
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
+ }
+ } else {
+ if ( parInfo.maxParallelJob > 1 ) {
+ synchronized ( this.parallelProcessingMap ) {
+ final Integer value = this.parallelProcessingMap.get(jobTopic);
+ this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
+ }
+ }
+ }
+ }
+
public enum Status {
FAILED,
RESCHEDULE,
@@ -730,7 +794,6 @@
*/
private Status executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
boolean putback = false;
- boolean wait = false;
synchronized (this.backgroundLock) {
if ( logger.isDebugEnabled() ) {
logger.debug("Executing job {}.", EventUtil.toString(info.event));
@@ -739,45 +802,30 @@
this.backgroundSession.refresh(false);
// check if the node still exists
if ( this.backgroundSession.itemExists(info.nodePath)
- && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
+ && !this.backgroundSession.itemExists(info.nodePath + '/' + EventHelper.NODE_PROPERTY_FINISHED)) {
final Event event = info.event;
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
- // check how we can process this job:
- // if the job should not be processed in parallel, we have to check
- // if another job with the same topic is currently running
- // if parallel processing is allowed, we have to check for the number
- // of max allowed parallel jobs for this topic
- boolean process = parInfo.processParallel;
- if ( !parInfo.processParallel ) {
- synchronized ( this.processingMap ) {
- final Boolean value = this.processingMap.get(jobTopic);
- if ( value == null || !value.booleanValue() ) {
- this.processingMap.put(jobTopic, Boolean.TRUE);
- process = true;
- }
- }
- } else {
- if ( parInfo.maxParallelJob > 1 ) {
- synchronized ( this.parallelProcessingMap ) {
- final Integer value = this.parallelProcessingMap.get(jobTopic);
- final int currentValue = (value == null ? 0 : value.intValue());
- if ( currentValue < parInfo.maxParallelJob ) {
- this.parallelProcessingMap.put(jobTopic, currentValue + 1);
- } else {
- process = false;
- }
- }
- }
- }
+ final boolean process = checkPrecondition(parInfo, jobTopic);
// check number of parallel jobs for main queue
if ( process && jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Rescheduling job {} - maximum parallel job count of {} reached!", EventUtil.toString(info.event), this.maximumParallelJobs);
+ logger.debug("Waiting with executing job {} - maximum parallel job count of {} reached!",
+ EventUtil.toString(info.event), this.maximumParallelJobs);
+ }
+ try {
+ this.backgroundLock.wait();
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // let's check if we're still running
+ if ( !this.running ) {
+ return Status.FAILED;
+ }
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Continuing with executing job {}.", EventUtil.toString(info.event));
}
- process = false;
- wait = true;
}
if ( process ) {
boolean unlock = true;
@@ -789,31 +837,18 @@
eventNode.lock(false, true);
} catch (RepositoryException re) {
// lock failed which means that the node is locked by someone else, so we don't have to requeue
- process = false;
- }
- if ( process ) {
- unlock = false;
- this.processJob(info.event, eventNode, jobQueue == null);
- return Status.SUCCESS;
+ return Status.FAILED;
}
+ unlock = false;
+ this.processJob(info.event, eventNode, jobQueue == null, parInfo);
+ return Status.SUCCESS;
}
} catch (RepositoryException e) {
// ignore
this.ignoreException(e);
} finally {
if ( unlock ) {
- if ( !parInfo.processParallel ) {
- synchronized ( this.processingMap ) {
- this.processingMap.put(jobTopic, Boolean.FALSE);
- }
- } else {
- if ( parInfo.maxParallelJob > 1 ) {
- synchronized ( this.parallelProcessingMap ) {
- final Integer value = this.parallelProcessingMap.get(jobTopic);
- this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
- }
- }
- }
+ unlockState(parInfo, jobTopic);
}
}
} else {
@@ -834,17 +869,6 @@
}
}
- // if this is the main queue and we have reached the max number of parallel jobs
- // we wait a little bit before continuing
- if ( wait ) {
- logger.debug("Sleeping for {} seconds as the maximum number of parallel threads is reached.", sleepTime);
- try {
- Thread.sleep(sleepTime * 1000);
- } catch (InterruptedException ie) {
- // ignore
- ignoreException(ie);
- }
- }
// if we have to put back the job, we return this status
if ( putback ) {
return Status.RESCHEDULE;
@@ -886,7 +910,7 @@
this.ignoreException(e);
}
} else {
- this.logger.warn("Event does not contain job topic: {}", event);
+ this.logger.warn("Event does not contain job topic: {}", EventUtil.toString(event));
}
} else {
@@ -912,22 +936,7 @@
try {
if ( s.itemExists(path) ) {
final Node eventNode = (Node) s.getItem(path);
- if ( !eventNode.isLocked() ) {
- try {
- final EventInfo info = new EventInfo();
- info.event = readEvent(eventNode);
- info.nodePath = path;
- try {
- queue.put(info);
- } catch (InterruptedException e) {
- // we ignore this exception as this should never occur
- ignoreException(e);
- }
- } catch (ClassNotFoundException cnfe) {
- newUnloadedJobs.add(path);
- ignoreException(cnfe);
- }
- }
+ tryToLoadJob(eventNode, newUnloadedJobs);
}
} catch (RepositoryException re) {
// we ignore this and readd
@@ -956,46 +965,15 @@
}
/**
- * Create a unique node path (folder and name) for the job.
- */
- private String getNodePath(final String jobTopic, final String jobId) {
- final StringBuilder sb = new StringBuilder(jobTopic.replace('/', '.'));
- sb.append('/');
- if ( jobId != null ) {
- // we create an md from the job id - we use the first 6 bytes to
- // create sub directories
- final String md5 = EventHelper.md5(jobId);
- sb.append(md5.substring(0, 2));
- sb.append('/');
- sb.append(md5.substring(2, 4));
- sb.append('/');
- sb.append(md5.substring(4, 6));
- sb.append('/');
- sb.append(EventHelper.filter(jobId));
- } else {
- // create a path from the uuid - we use the first 6 bytes to
- // create sub directories
- final String uuid = UUID.randomUUID().toString();
- sb.append(uuid.substring(0, 2));
- sb.append('/');
- sb.append(uuid.substring(2, 4));
- sb.append('/');
- sb.append(uuid.substring(5, 7));
- sb.append("/Job_");
- sb.append(uuid.substring(8, 17));
- }
- return sb.toString();
- }
-
- /**
* Process a job and unlock the node in the repository.
* @param event The original event.
* @param eventNode The node in the repository where the job is stored.
* @param isMainQueue Is this the main queue?
*/
- private void processJob(Event event, Node eventNode, boolean isMainQueue) {
- final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
- final boolean parallelProcessing = parInfo.processParallel;
+ private void processJob(final Event event,
+ final Node eventNode,
+ final boolean isMainQueue,
+ final ParallelInfo parInfo) {
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
if ( logger.isDebugEnabled() ) {
logger.debug("Starting job {}", EventUtil.toString(event));
@@ -1034,18 +1012,7 @@
if ( isMainQueue ) {
this.parallelJobCount--;
}
- if ( !parallelProcessing ) {
- synchronized ( this.processingMap ) {
- this.processingMap.put(jobTopic, Boolean.FALSE);
- }
- } else {
- if ( parInfo.maxParallelJob > 1 ) {
- synchronized ( this.parallelProcessingMap ) {
- final Integer value = this.parallelProcessingMap.get(jobTopic);
- this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
- }
- }
- }
+ this.unlockState(parInfo, jobTopic);
// unlock node
try {
@@ -1137,25 +1104,7 @@
s = this.createSession();
}
final Node eventNode = (Node) s.getItem(nodePath);
- if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
- try {
- final EventInfo info = new EventInfo();
- info.event = this.readEvent(eventNode);
- info.nodePath = nodePath;
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // we ignore this exception as this should never occur
- this.ignoreException(e);
- }
- } catch (ClassNotFoundException cnfe) {
- // store path for lazy loading
- synchronized ( this.unloadedJobs ) {
- this.unloadedJobs.add(nodePath);
- }
- this.ignoreException(cnfe);
- }
- }
+ tryToLoadJob(eventNode, this.unloadedJobs);
}
}
} catch (RepositoryException re) {
@@ -1215,34 +1164,26 @@
long count = 0;
while ( result.hasNext() && count < maxLoad ) {
final Node eventNode = result.nextNode();
- if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ eventCreated = eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+ if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
count++;
- eventCreated = eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
- final String nodePath = eventNode.getPath();
- try {
- final Event event = this.readEvent(eventNode);
- final EventInfo info = new EventInfo();
- info.event = event;
- info.nodePath = nodePath;
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // we ignore this exception as this should never occur
- this.ignoreException(e);
- }
- } catch (ClassNotFoundException cnfe) {
- // store path for lazy loading
- synchronized ( this.unloadedJobs ) {
- this.unloadedJobs.add(nodePath);
- }
- this.ignoreException(cnfe);
- } catch (RepositoryException re) {
- this.logger.error("Unable to load stored job from " + nodePath, re);
+ }
+ }
+ // now we have to add all jobs with the same created time!
+ boolean done = false;
+ while ( result.hasNext() && !done ) {
+ final Node eventNode = result.nextNode();
+ final long created = eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+ if ( created == eventCreated ) {
+ if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+ count++;
}
+ } else {
+ done = true;
}
}
// have we processed all jobs?
- if ( !result.hasNext() ) {
+ if ( !done && !result.hasNext() ) {
eventCreated = -1;
}
logger.debug("Loaded {} jobs and new since {}", count, eventCreated);
@@ -1253,6 +1194,33 @@
return eventCreated;
}
+ private boolean tryToLoadJob(final Node eventNode, final Set<String> unloadedJobSet) {
+ try {
+ if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ final String nodePath = eventNode.getPath();
+ try {
+ final Event event = this.readEvent(eventNode);
+ final EventInfo info = new EventInfo();
+ info.event = event;
+ info.nodePath = nodePath;
+ this.queueJob(info);
+ } catch (ClassNotFoundException cnfe) {
+ // store path for lazy loading
+ synchronized ( unloadedJobSet ) {
+ unloadedJobSet.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to load stored job from " + nodePath, re);
+ }
+ return true;
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to load stored job from " + eventNode, re);
+ }
+ return false;
+ }
+
/**
* @see org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event, java.lang.String)
*/
@@ -1322,7 +1290,6 @@
this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job);
}
final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
- final boolean parallelProcessing = parInfo.processParallel;
EventInfo putback = null;
// we have to use the same session for unlocking that we used for locking!
synchronized ( this.backgroundLock ) {
@@ -1370,20 +1337,11 @@
this.logger.error("Exception during job finishing.", re);
} finally {
final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- if ( !parallelProcessing) {
- synchronized ( this.processingMap ) {
- this.processingMap.put(jobTopic, Boolean.FALSE);
- }
- } else {
- if ( parInfo.maxParallelJob > 1 ) {
- synchronized ( this.parallelProcessingMap ) {
- final Integer value = this.parallelProcessingMap.get(jobTopic);
- this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
- }
- }
- }
+ this.unlockState(parInfo, jobTopic);
+
if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
this.parallelJobCount--;
+ this.backgroundLock.notify();
}
if ( unlock ) {
@@ -1422,19 +1380,7 @@
if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
checkForNotify(job, info);
} else {
-
- // delay rescheduling?
- if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
- putback = info;
- } else {
- // put directly into queue
- try {
- queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
- }
- }
+ putback = info;
}
} else {
// if this is an own job queue, we simply signal the queue to continue
@@ -1456,17 +1402,6 @@
}
private void putBackIntoMainQueue(final EventInfo info, final boolean useSleepTime) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Putting job {} back into the queue.", EventUtil.toString(info.event));
- }
- final Date fireDate = new Date();
- if ( useSleepTime ) {
- fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
- } else {
- final long delay = (Long)info.event.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
- fireDate.setTime(System.currentTimeMillis() + delay);
- }
-
final Runnable t = new Runnable() {
public void run() {
try {
@@ -1477,21 +1412,42 @@
}
}
};
- try {
- this.scheduler.fireJobAt(null, t, null, fireDate);
- } catch (Exception e) {
- // we ignore the exception and just put back the job in the queue
- ignoreException(e);
- if ( useSleepTime ) {
+
+ final long delay;
+ if ( useSleepTime ) {
+ delay = this.sleepTime * 1000;
+ } else {
+ final Long obj = (Long)info.event.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ delay = (obj == null) ? -1 : obj.longValue();
+ }
+ if ( delay == -1 ) {
+ // put directly without waiting
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Putting job {} back into the queue.", EventUtil.toString(info.event));
+ }
+ t.run();
+ } else {
+ // schedule the put
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Putting job {} back into the queue after {}ms.", EventUtil.toString(info.event), delay);
+ }
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ try {
+ this.scheduler.fireJobAt(null, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
// then wait for the time and readd the job
try {
- Thread.sleep(sleepTime * 1000);
+ Thread.sleep(delay);
} catch (InterruptedException ie) {
// ignore
ignoreException(ie);
}
+ t.run();
}
- t.run();
}
}
@@ -1666,7 +1622,7 @@
*/
public void cancelJob(String topic, String jobId) {
if ( jobId != null && topic != null ) {
- this.cancelJob(this.getNodePath(topic, jobId));
+ this.cancelJob(JobUtil.getUniquePath(topic, jobId));
}
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,59 @@
+package org.apache.sling.event.impl.job;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.UUID;
+
+import org.apache.sling.event.impl.EventHelper;
+
+public class JobUtil {
+
+ /**
+ * Create a unique node path (folder and name) for the job.
+ */
+ public static String getUniquePath(final String jobTopic, final String jobId) {
+ final StringBuilder sb = new StringBuilder(jobTopic.replace('/', '.'));
+ sb.append('/');
+ if ( jobId != null ) {
+ // we create an md from the job id - we use the first 6 bytes to
+ // create sub directories
+ final String md5 = EventHelper.md5(jobId);
+ sb.append(md5.substring(0, 2));
+ sb.append('/');
+ sb.append(md5.substring(2, 4));
+ sb.append('/');
+ sb.append(md5.substring(4, 6));
+ sb.append('/');
+ sb.append(EventHelper.filter(jobId));
+ } else {
+ // create a path from the uuid - we use the first 6 bytes to
+ // create sub directories
+ final String uuid = UUID.randomUUID().toString();
+ sb.append(uuid.substring(0, 2));
+ sb.append('/');
+ sb.append(uuid.substring(2, 4));
+ sb.append('/');
+ sb.append(uuid.substring(5, 7));
+ sb.append("/Job_");
+ sb.append(uuid.substring(8, 17));
+ }
+ return sb.toString();
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobUtil.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Fri Feb 19 16:14:26 2010
@@ -18,8 +18,6 @@
*/
package org.apache.sling.event.impl;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Dictionary;
@@ -29,9 +27,7 @@
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
-import javax.jcr.observation.EventListenerIterator;
-import org.apache.sling.commons.testing.jcr.RepositoryUtil;
import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPoolConfig;
import org.apache.sling.engine.SlingSettingsService;
@@ -66,19 +62,19 @@
}
@org.junit.BeforeClass public static void setupRepository() throws Exception {
- RepositoryUtil.startRepository();
- final SlingRepository repository = RepositoryUtil.getRepository();
+ RepositoryTestUtil.startRepository();
+ final SlingRepository repository = RepositoryTestUtil.getSlingRepository();
session = repository.loginAdministrative(repository.getDefaultWorkspace());
- assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
- assertTrue(RepositoryUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
+ assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
+ assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
}
@org.junit.AfterClass public static void shutdownRepository() throws Exception {
- RepositoryUtil.stopRepository();
+ RepositoryTestUtil.stopRepository();
}
@org.junit.Before public void setup() throws Exception {
- this.handler.repository = RepositoryUtil.getRepository();
+ this.handler.repository = RepositoryTestUtil.getSlingRepository();
// the event admin
final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class);
@@ -119,14 +115,6 @@
}
@org.junit.After public void shutdown() throws Exception {
- // delete all child nodes to get a clean repository again
- final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
- final NodeIterator iter = rootNode.getNodes();
- while ( iter.hasNext() ) {
- final Node child = iter.nextNode();
- child.remove();
- }
- rootNode.save();
// lets set up the bundle context with the sling id
final BundleContext bundleContext = this.getMockery().mock(BundleContext.class);
@@ -136,19 +124,18 @@
will(returnValue(bundleContext));
}});
this.handler.deactivate(componentContext);
- }
-
- @org.junit.Test public void testSetup() throws RepositoryException {
- assertEquals(this.handler.applicationId, SLING_ID);
- assertEquals(this.handler.repositoryPath, REPO_PATH);
- assertNotNull(this.handler.writerSession);
- final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
- boolean found = false;
- while ( !found && iter.hasNext() ) {
- final javax.jcr.observation.EventListener listener = iter.nextEventListener();
- found = (listener == this.handler);
+ try {
+ // delete all child nodes to get a clean repository again
+ final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+ final NodeIterator iter = rootNode.getNodes();
+ while ( iter.hasNext() ) {
+ final Node child = iter.nextNode();
+ child.remove();
+ }
+ session.save();
+ } catch ( RepositoryException re) {
+ // we ignore this for the test
}
- assertTrue("Handler is not registered as event listener.", found);
}
@org.junit.Test public void testPathCreation() throws RepositoryException {
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Simplified version of the cyclic barrier class for testing. */
+public class Barrier extends CyclicBarrier {
+
+ public Barrier(int parties) {
+ super(parties);
+ }
+
+ public void block() {
+ try {
+ this.await();
+ } catch (InterruptedException e) {
+ // ignore
+ } catch (BrokenBarrierException e) {
+ // ignore
+ }
+ }
+
+ public boolean block(int seconds) {
+ try {
+ this.await(seconds, TimeUnit.SECONDS);
+ return true;
+ } catch (InterruptedException e) {
+ // ignore
+ } catch (BrokenBarrierException e) {
+ // ignore
+ } catch (TimeoutException e) {
+ // ignore
+ }
+ this.reset();
+ return false;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java?rev=911855&r1=911854&r2=911855&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java Fri Feb 19 16:14:26 2010
@@ -19,6 +19,7 @@
package org.apache.sling.event.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Calendar;
@@ -27,6 +28,8 @@
import javax.jcr.Node;
import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.observation.EventListenerIterator;
import org.apache.jackrabbit.util.ISO9075;
import org.apache.sling.event.EventUtil;
@@ -51,6 +54,19 @@
return this.context;
}
+ @org.junit.Test public void testSetup() throws RepositoryException {
+ assertEquals(this.handler.applicationId, SLING_ID);
+ assertEquals(this.handler.repositoryPath, REPO_PATH);
+ assertNotNull(this.handler.writerSession);
+ final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
+ boolean found = false;
+ while ( !found && iter.hasNext() ) {
+ final javax.jcr.observation.EventListener listener = iter.nextEventListener();
+ found = (listener == this.handler);
+ }
+ assertTrue("Handler is not registered as event listener.", found);
+ }
+
@org.junit.Test public void testWriteEvent() throws Exception {
final String topic = "write/event/test";
final Dictionary<String, Object> props = new Hashtable<String, Object>();
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.observation.EventListenerIterator;
+
+import org.apache.sling.event.EventUtil;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JMock.class)
+public class JobEventHandlerTest extends AbstractRepositoryEventHandlerTest {
+
+ protected Mockery context;
+
+ public JobEventHandlerTest() {
+ this.handler = new JobEventHandler();
+ this.context = new JUnit4Mockery();
+ }
+
+ @Override
+ protected Mockery getMockery() {
+ return this.context;
+ }
+
+ @org.junit.Test public void testSetup() throws RepositoryException {
+ assertEquals(this.handler.applicationId, SLING_ID);
+ assertEquals(this.handler.repositoryPath, REPO_PATH);
+ assertNotNull(((JobEventHandler)this.handler).backgroundSession);
+ final EventListenerIterator iter = ((JobEventHandler)this.handler).backgroundSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
+ boolean found = false;
+ while ( !found && iter.hasNext() ) {
+ final javax.jcr.observation.EventListener listener = iter.nextEventListener();
+ found = (listener == this.handler);
+ }
+ assertTrue("Handler is not registered as event listener.", found);
+ }
+
+ private Event getJobEvent() {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventUtil.PROPERTY_JOB_TOPIC, "sling/test");
+ props.put(EventUtil.PROPERTY_JOB_RETRY_DELAY, 2000L);
+ props.put(EventUtil.PROPERTY_JOB_RETRIES, 2);
+ return new Event(EventUtil.TOPIC_JOB, props);
+ }
+
+ @org.junit.Test public void testSimpleJobExecution() throws Exception {
+ final JobEventHandler jeh = (JobEventHandler)this.handler;
+ jeh.handleEvent(getJobEvent());
+ final Barrier cb = new Barrier(2);
+ jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+ new EventHandler[] {
+ new EventHandler() {
+ public void handleEvent(Event event) {
+ EventUtil.finishedJob(event);
+ cb.block();
+ }
+
+ }
+ });
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ }
+
+ @org.junit.Test public void testStartJobAndReschedule() throws Exception {
+ final JobEventHandler jeh = (JobEventHandler)this.handler;
+ jeh.handleEvent(getJobEvent());
+ final Barrier cb = new Barrier(2);
+ jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+ new EventHandler[] {
+ new EventHandler() {
+ public void handleEvent(Event event) {
+ EventUtil.rescheduleJob(event);
+ cb.block();
+ }
+
+ }
+ });
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ // we have reached the retry so we expect to not get an event
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,130 @@
+package org.apache.sling.event.impl;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import javax.jcr.Credentials;
+import javax.jcr.LoginException;
+import javax.jcr.NoSuchWorkspaceException;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.Value;
+
+import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.commons.cnd.CndImporter;
+import org.apache.sling.jcr.api.SlingRepository;
+
+public class RepositoryTestUtil {
+
+ /** We hold an admin session the whole time. */
+ private static Session adminSession;
+
+ private static Repository repository;
+
+ private static void init() throws RepositoryException {
+ if ( repository == null ) {
+ final File f = new File("target/repository");
+ repository = JcrUtils.getRepository(f.toURI().toString());
+
+ final SimpleCredentials cred = new SimpleCredentials("admin", "admin".toCharArray());
+ adminSession = repository.login(cred);
+ }
+ }
+
+ public static void startRepository() throws RepositoryException {
+ init();
+ }
+
+ public static void stopRepository() {
+ if ( adminSession != null ) {
+ adminSession.logout();
+ adminSession = null;
+ }
+ repository = null;
+ }
+
+ public static Session getAdminSession() {
+ return adminSession;
+ }
+
+ public static SlingRepository getSlingRepository() {
+ return new RepositoryWrapper(repository);
+ }
+
+ public static boolean registerNodeType(Session session, InputStream resourceAsStream) {
+ try {
+ CndImporter.registerNodeTypes(new InputStreamReader(resourceAsStream, "UTF-8"), session);
+ return true;
+ } catch (Exception e) {
+ // ignore
+ return false;
+ }
+ }
+
+ public static final class RepositoryWrapper implements SlingRepository {
+
+ protected final Repository wrapped;
+
+ public RepositoryWrapper(Repository r) {
+ wrapped = r;
+ }
+
+ public String getDescriptor(String key) {
+ return wrapped.getDescriptor(key);
+ }
+
+ public String[] getDescriptorKeys() {
+ return wrapped.getDescriptorKeys();
+ }
+
+ public Session login() throws LoginException, RepositoryException {
+ return wrapped.login();
+ }
+
+ public Session login(Credentials credentials, String workspaceName)
+ throws LoginException, NoSuchWorkspaceException,
+ RepositoryException {
+ return wrapped.login(credentials, workspaceName);
+ }
+
+ public Session login(Credentials credentials) throws LoginException,
+ RepositoryException {
+ return wrapped.login(credentials);
+ }
+
+ public Session login(String workspaceName) throws LoginException,
+ NoSuchWorkspaceException, RepositoryException {
+ return wrapped.login(workspaceName);
+ }
+
+ public String getDefaultWorkspace() {
+ return "default";
+ }
+
+ public Session loginAdministrative(String workspace)
+ throws RepositoryException {
+ final Credentials credentials = new SimpleCredentials("admin",
+ "admin".toCharArray());
+ return this.login(credentials, workspace);
+ }
+
+ public Value getDescriptorValue(String key) {
+ return wrapped.getDescriptorValue(key);
+ }
+
+ public Value[] getDescriptorValues(String key) {
+ return wrapped.getDescriptorValues(key);
+ }
+
+ public boolean isSingleValueDescriptor(String key) {
+ return wrapped.isSingleValueDescriptor(key);
+ }
+
+ public boolean isStandardDescriptor(String key) {
+ return wrapped.isStandardDescriptor(key);
+ }
+ }
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java?rev=911855&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java Fri Feb 19 16:14:26 2010
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+/**
+ * Simple event admin implementation for testing.
+ */
+public class SimpleEventAdmin implements EventAdmin {
+
+ private final String[] topics;
+ private final EventHandler[] handler;
+
+ public SimpleEventAdmin(final String[] topics, final EventHandler[] handler) {
+ this.topics = topics;
+ this.handler = handler;
+ if ( topics == null && handler != null ) {
+ throw new IllegalArgumentException("If topics is null, handler must be null as well");
+ }
+ if ( topics.length != handler.length ) {
+ throw new IllegalArgumentException("Topics and handler must have the same size.");
+ }
+ }
+
+ public void postEvent(final Event event) {
+ new Thread() {
+ public void run() {
+ sendEvent(event);
+ }
+ }.start();
+ }
+
+ public void sendEvent(Event event) {
+ if ( topics != null ) {
+ for(int i=0; i<topics.length; i++) {
+ if ( topics[i].equals(event.getTopic()) ) {
+ handler[i].handleEvent(event);
+ }
+ }
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
------------------------------------------------------------------------------
svn:mime-type = text/plain