You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/13 15:58:34 UTC
svn commit: r1022108 - in /sling/branches/eventing-3.0/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/jcr/
test/java/org/apache/sling/event/impl/jobs/
Author: cziegeler
Date: Wed Oct 13 13:58:33 2010
New Revision: 1022108
URL: http://svn.apache.org/viewvc?rev=1022108&view=rev
Log:
Add another test and improve jcr handling by registering two different listeners
Modified:
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Wed Oct 13 13:58:33 2010
@@ -592,9 +592,6 @@ public class DefaultJobManager
boolean result = true;
if ( job != null ) {
result = job.remove();
- if ( result ) {
- this.notifyRemoveJob(jobId);
- }
}
return result;
}
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java Wed Oct 13 13:58:33 2010
@@ -74,6 +74,8 @@ public abstract class JCRHelper {
/** The nodetype for newly created folders */
public static final String NODETYPE_ORDERED_FOLDER = "sling:OrderedFolder";
+ /** The property for locks. */
+ public static final String NODE_PROPERTY_LOCK_OWNER = "jcr:lockOwner";
/** List of ignored properties to write to the repository. */
private static final String[] IGNORE_PROPERTIES = new String[] {
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Wed Oct 13 13:58:33 2010
@@ -191,20 +191,18 @@ public class PersistenceHandler implemen
loaderThread.setDaemon(true);
loaderThread.start();
- // open background session for observation
+ // open background session for all job related tasks (lock, unlock etc.)
// create the background session and register a listener
this.backgroundSession = this.environment.createAdminSession();
- this.backgroundSession.getWorkspace().getObservationManager()
- .addEventListener(this,
- javax.jcr.observation.Event.PROPERTY_REMOVED
- |javax.jcr.observation.Event.PROPERTY_ADDED
- |javax.jcr.observation.Event.NODE_REMOVED
- |javax.jcr.observation.Event.NODE_ADDED,
- this.repositoryPath,
- true,
- null,
- null,
- true);
+ this.backgroundSession.getWorkspace().getObservationManager().addEventListener(this,
+ javax.jcr.observation.Event.PROPERTY_REMOVED
+ |javax.jcr.observation.Event.PROPERTY_ADDED
+ |javax.jcr.observation.Event.NODE_REMOVED,
+ this.repositoryPath,
+ true,
+ null,
+ null,
+ true);
}
/**
@@ -262,7 +260,7 @@ public class PersistenceHandler implemen
final String propertyName = path.substring(pos+1);
// we are only interested in unlocks
- if ( "jcr:lockOwner".equals(propertyName) ) {
+ if ( JCRHelper.NODE_PROPERTY_LOCK_OWNER.equals(propertyName) ) {
loadNodePath = path.substring(0, pos);
}
} else if ( event.getType() == javax.jcr.observation.Event.PROPERTY_ADDED ) {
@@ -270,7 +268,7 @@ public class PersistenceHandler implemen
final String propertyName = path.substring(pos+1);
// we are only interested in locks
- if ( "jcr:lockOwner".equals(propertyName) ) {
+ if ( JCRHelper.NODE_PROPERTY_LOCK_OWNER.equals(propertyName) ) {
((DefaultJobManager)this.jobManager).notifyActiveJob(path.substring(this.repositoryPath.length() + 1));
}
@@ -559,6 +557,14 @@ public class PersistenceHandler implemen
try {
writerSession = this.environment.createAdminSession();
+ // we only listen for all node added events not coming from this session(!)
+ writerSession.getWorkspace().getObservationManager().addEventListener(this,
+ javax.jcr.observation.Event.NODE_ADDED,
+ this.repositoryPath,
+ true,
+ null,
+ null,
+ true);
rootNode = this.createPath(writerSession.getRootNode(),
this.repositoryPath.substring(1),
JCRHelper.NODETYPE_ORDERED_FOLDER);
@@ -576,6 +582,12 @@ public class PersistenceHandler implemen
running = false;
} finally {
if ( writerSession != null ) {
+ try {
+ writerSession.getWorkspace().getObservationManager().removeEventListener(this);
+ } catch (RepositoryException e) {
+ // we just ignore it
+ this.logger.warn("Unable to remove event listener.", e);
+ }
writerSession.logout();
}
}
@@ -602,12 +614,14 @@ public class PersistenceHandler implemen
final String jobTopic = (String)event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
final String nodePath = Utility.getUniquePath(jobTopic, jobId);
+ Node readAndProcess = null;
+
// if the job has no job id, we can just write the job to the repo and don't
// need locking
if ( jobId == null ) {
try {
- this.writeEvent(rootNode, event, nodePath);
- } catch (RepositoryException re ) {
+ readAndProcess = this.writeEvent(rootNode, event, nodePath);
+ } catch (final RepositoryException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
}
@@ -621,17 +635,21 @@ public class PersistenceHandler implemen
if ( foundNode == null ) {
// We now write the event into the repository
try {
- this.writeEvent(rootNode, event, nodePath);
+ readAndProcess = this.writeEvent(rootNode, event, nodePath);
} catch (ItemExistsException iee) {
// someone else did already write this node in the meantime
// nothing to do for us
}
}
- } catch (RepositoryException re ) {
+ } catch (final RepositoryException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
}
}
+
+ if ( readAndProcess != null ) {
+ tryToLoadJob(readAndProcess, this.unloadedJobs);
+ }
}
}
}
@@ -650,7 +668,7 @@ public class PersistenceHandler implemen
* @param suggestedName A suggested name/path for the node.
* @throws RepositoryException
*/
- private void writeEvent(final Node rootNode, final Event e, final String path)
+ private Node writeEvent(final Node rootNode, final Event e, final String path)
throws RepositoryException {
// create new node with name of topic
final Node eventNode = this.createPath(rootNode,
@@ -669,6 +687,7 @@ public class PersistenceHandler implemen
eventNode.setProperty(JCRHelper.NODE_PROPERTY_JOBID, jobId);
}
rootNode.getSession().save();
+ return eventNode;
}
/**
@@ -882,6 +901,7 @@ public class PersistenceHandler implemen
// lock failed which means that the node is locked by someone else, so we don't have to requeue
return false;
}
+ ((DefaultJobManager)this.jobManager).notifyActiveJob(info.uniqueId);
return true;
}
}
@@ -916,6 +936,7 @@ public class PersistenceHandler implemen
synchronized ( this.backgroundLock ) {
try {
if ( this.backgroundSession.itemExists(path) ) {
+ ((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
final Node eventNode = (Node)this.backgroundSession.getItem(path);
if ( jobId == null ) {
// simply remove the node
@@ -964,6 +985,7 @@ public class PersistenceHandler implemen
}
eventNode.remove();
this.backgroundSession.save();
+ ((DefaultJobManager)this.jobManager).notifyRemoveJob(jobId);
}
} catch (RepositoryException e) {
this.logger.error("Error during cancelling job at " + path, e);
Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java?rev=1022108&r1=1022107&r2=1022108&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java (original)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java Wed Oct 13 13:58:33 2010
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jcr.Node;
import javax.jcr.Session;
@@ -43,6 +44,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.JobManager.QueryType;
+import org.apache.sling.event.jobs.JobProcessor;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.JobsIterator;
import org.jmock.Mockery;
@@ -590,4 +592,43 @@ public class JobEventHandlerTest extends
cb.reset();
assertEquals("Unexpected number of retries", 3, retryCountList.size());
}
+
+ @org.junit.Test public void testManyJobs() throws Exception {
+ final PersistenceHandler jeh = this.handler;
+ final AtomicInteger count = new AtomicInteger(0);
+ setEventAdmin(new SimpleEventAdmin(new String[] {"sling/test",
+ JobUtil.TOPIC_JOB_FINISHED},
+ new EventHandler[] {
+ new EventHandler() {
+ public void handleEvent(final Event event) {
+ JobUtil.processJob(event, new JobProcessor() {
+
+ public boolean process(Event job) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ie) {}
+ return true;
+ }
+ });
+ }
+ },
+ new EventHandler() {
+ public void handleEvent(final Event event) {
+ count.incrementAndGet();
+ }
+ }}));
+ // we start "some" jobs
+ final int COUNT = 300;
+ for(int i = 0; i < COUNT; i++ ) {
+ final String queueName = "queue" + (i % 20);
+ jeh.handleEvent(getJobEvent(queueName, null, "2"));
+ }
+ while ( count.get() < COUNT ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {}
+ }
+ assertEquals("Finished count", COUNT, count.get());
+ assertEquals("Finished count", COUNT, this.jobManager.getStatistics().getNumberOfFinishedJobs());
+ }
}