You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/02/11 16:19:32 UTC
svn commit: r620503 - in /incubator/sling/trunk/sling/event/src:
main/java/org/apache/sling/event/ main/java/org/apache/sling/event/impl/
test/java/org/apache/sling/event/impl/
Author: cziegeler
Date: Mon Feb 11 07:19:29 2008
New Revision: 620503
URL: http://svn.apache.org/viewvc?rev=620503&view=rev
Log:
SLING-177: Improvements to the event and job handling:
- Adding a retry count and retry delay for jobs
- Remove locking of parent node
- Use separate write queue for minimum delay between publishing and storing
- Improve parallel job processing, only jobs of same type are now serialized, everything else runs in parallel.
Modified:
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/EventUtil.java Mon Feb 11 07:19:29 2008
@@ -52,7 +52,7 @@
/** The property to for setting the maximum number of retries. Value is of type Integer. */
public static final String PROPERTY_JOB_RETRIES = "event.job.retries";
- /** The property to set a retry delay. Value is of type Long. */
+ /** The property to set a retry delay. Value is of type Long and specifies milliseconds. */
public static final String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
/** The topic for jobs. */
@@ -140,36 +140,14 @@
/**
* Notify a failed job.
+ * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
*/
- public static void rescheduleJob(Event job) {
+ public static boolean rescheduleJob(Event job) {
final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
if ( ctx == null ) {
throw new NullPointerException("JobStatusNotifier context is not available in event properties.");
}
- boolean retry = true;
- // check if we exceeded the number of retries
- if ( job.getProperty(PROPERTY_JOB_RETRIES) != null ) {
- int retries = (Integer) job.getProperty(PROPERTY_JOB_RETRIES);
- int retryCount = 0;
- if ( job.getProperty(PROPERTY_JOB_RETRY_COUNT) != null ) {
- retryCount = (Integer)job.getProperty(PROPERTY_JOB_RETRY_COUNT);
- }
- retryCount++;
- if ( retryCount >= retries ) {
- retry = false;
- }
- // update event with retry count
- final Dictionary<String, Object> newProperties;
- // create a new dictionary
- newProperties = new Hashtable<String, Object>();
- final String[] names = job.getPropertyNames();
- for(int i=0; i<names.length; i++ ) {
- newProperties.put(names[i], job.getProperty(names[i]));
- }
- newProperties.put(PROPERTY_JOB_RETRY_COUNT, retryCount);
- job = new Event(job.getTopic(), newProperties);
- }
- ctx.notifier.finishedJob(job, ctx.eventNodePath, ctx.lockToken, retry);
+ return ctx.notifier.finishedJob(job, ctx.eventNodePath, ctx.lockToken, true);
}
/**
@@ -217,6 +195,18 @@
}
}
- void finishedJob(Event job, String eventNodePath, String lockToken, boolean reschedule);
+ /**
+ * Notify that the job is finished.
+ * If the job is not rescheduled, a return value of <code>false</code> indicates an error
+ * during the processing. If the job should be rescheduled, <code>true</code> indicates
+ * that the job could be rescheduled. If an error occurs or the number of retries is
+ * exceeded, <code>false</code> will be returned.
+ * @param job The job.
+ * @param eventNodePath The storage node in the repository.
+ * @param lockToken The lock token locking the node.
+ * @param reschedule Should the event be rescheduled?
+ * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
+ */
+ boolean finishedJob(Event job, String eventNodePath, String lockToken, boolean reschedule);
}
}
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java Mon Feb 11 07:19:29 2008
@@ -31,9 +31,11 @@
import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.Node;
+import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.EventListener;
+import javax.jcr.query.Query;
import org.apache.jackrabbit.JcrConstants;
import org.apache.sling.event.EventUtil;
@@ -83,8 +85,8 @@
/** Our application id. */
protected String applicationId;
- /** The repository session. */
- protected Session session;
+ /** The repository session to write into the repository. */
+ protected Session writerSession;
/** The path in the repository. */
protected String repositoryPath;
@@ -95,9 +97,12 @@
/** Is the background task still running? */
protected boolean running;
- /** A local queue for serialising the job processing. */
+ /** A local queue for serialising the event processing. */
protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
+ /** A local queue for writing received events into the repository. */
+ protected final BlockingQueue<EventInfo> writeQueue = new LinkedBlockingQueue<EventInfo>();
+
/**
* Activate this component.
* @param context
@@ -111,37 +116,79 @@
if ( i != null ) {
this.cleanupPeriod = i;
}
- // start background thread
+
+ // start background threads
this.running = true;
+ // start writer thread
final Thread t = new Thread() {
public void run() {
try {
- startSession();
- runInBackground();
+ startWriterSession();
} catch (RepositoryException e) {
// there is nothing we can do except log!
logger.error("Error during session starting.", e);
+ running = false;
}
+ processWriteQueue();
}
};
t.start();
+ final Thread t2 = new Thread() {
+ public void run() {
+ runInBackground();
+ }
+ };
+ t2.start();
}
/**
+ * This method is invoked periodically.
* @see java.lang.Runnable#run()
*/
public void run() {
if ( this.cleanupPeriod > 0 ) {
- this.cleanUpRepository();
+ this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+ final String queryString = this.getCleanUpQueryString();
+ if ( queryString != null ) {
+ // we create an own session for concurrency issues
+ Session s = null;
+ try {
+ s = this.createSession();
+ final Node parentNode = (Node)s.getItem(this.repositoryPath);
+ logger.debug("Executing query {}", queryString);
+ final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+ final NodeIterator iter = q.execute().getNodes();
+ int count = 0;
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ eventNode.remove();
+ count++;
+ }
+ parentNode.save();
+ logger.debug("Removed {} entries from the repository.", count);
+
+ } catch (RepositoryException e) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during repository cleanup.", e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
}
}
protected abstract void runInBackground();
+ protected abstract void processWriteQueue();
+
/**
- * Clean up the repository.
+ * The query to detect old entries which can be removed.
+ * This method is invoked periodically from the {@link #run()} method.
*/
- protected abstract void cleanUpRepository();
+ protected abstract String getCleanUpQueryString();
/**
* Deactivate this component.
@@ -151,12 +198,18 @@
// stop background thread, by adding a job info to wake it up
this.running = false;
try {
+ this.writeQueue.put(new EventInfo());
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ try {
this.queue.put(new EventInfo());
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
- this.stopSession();
+ this.stopWriterSession();
}
/**
@@ -178,8 +231,8 @@
* for new events created on other nodes.
* @throws RepositoryException
*/
- protected void startSession() throws RepositoryException {
- this.session = this.createSession();
+ protected void startWriterSession() throws RepositoryException {
+ this.writerSession = this.createSession();
if ( this.repositoryPath != null ) {
this.createRepositoryPath();
}
@@ -188,16 +241,16 @@
/**
* Stop the session.
*/
- protected void stopSession() {
- if ( this.session != null ) {
+ protected void stopWriterSession() {
+ if ( this.writerSession != null ) {
try {
- this.session.getWorkspace().getObservationManager().removeEventListener(this);
+ this.writerSession.getWorkspace().getObservationManager().removeEventListener(this);
} catch (RepositoryException e) {
// we just ignore it
this.logger.warn("Unable to remove event listener.", e);
}
- this.session.logout();
- this.session = null;
+ this.writerSession.logout();
+ this.writerSession = null;
}
}
@@ -206,8 +259,8 @@
*/
protected void createRepositoryPath()
throws RepositoryException {
- if ( !this.session.itemExists(this.repositoryPath) ) {
- Node node = this.session.getRootNode();
+ if ( !this.writerSession.itemExists(this.repositoryPath) ) {
+ Node node = this.writerSession.getRootNode();
String path = this.repositoryPath.substring(1);
int pos = path.lastIndexOf('/');
if ( pos != -1 ) {
@@ -255,7 +308,7 @@
protected Node writeEvent(Event e)
throws RepositoryException {
// create new node with name of topic
- final Node rootNode = (Node) this.session.getItem(this.repositoryPath);
+ final Node rootNode = (Node) this.writerSession.getItem(this.repositoryPath);
final String nodeType = this.getEventNodeType();
final String nodeName = this.getNodeName(e);
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Mon Feb 11 07:19:29 2008
@@ -22,12 +22,9 @@
import java.util.Dictionary;
import javax.jcr.Node;
-import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.EventIterator;
-import javax.jcr.query.Query;
-import javax.jcr.query.QueryManager;
import org.apache.jackrabbit.util.ISO8601;
import org.apache.sling.event.EventUtil;
@@ -45,52 +42,59 @@
extends AbstractRepositoryEventHandler {
/**
- * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#cleanUpRepository()
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getCleanUpQueryString()
*/
- protected void cleanUpRepository() {
- this.logger.debug("Cleaning up repository, removing all events older than {} minutes.", this.cleanupPeriod);
+ protected String getCleanUpQueryString() {
+ final Calendar deleteBefore = Calendar.getInstance();
+ deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+ final String dateString = ISO8601.format(deleteBefore);
+
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(this.repositoryPath);
+ buffer.append("//element(*, ");
+ buffer.append(getEventNodeType());
+ buffer.append(")[@");
+ buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+ buffer.append(" < xs:dateTime('");
+ buffer.append(dateString);
+ buffer.append("')]");
- // we create an own session for concurrency issues
- Session s = null;
- try {
- s = this.createSession();
- final Node parentNode = (Node)s.getItem(this.repositoryPath);
- final Calendar deleteBefore = Calendar.getInstance();
- deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
- final String dateString = ISO8601.format(deleteBefore);
-
- final QueryManager qManager = parentNode.getSession().getWorkspace().getQueryManager();
- final StringBuffer buffer = new StringBuffer("/jcr:root");
- buffer.append(this.repositoryPath);
- buffer.append("//element(*, ");
- buffer.append(getEventNodeType());
- buffer.append(")[@");
- buffer.append(EventHelper.NODE_PROPERTY_CREATED);
- buffer.append(" < xs:dateTime('");
- buffer.append(dateString);
- buffer.append("')]");
-
- this.logger.debug("Executing query {}", buffer);
- final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
- final NodeIterator iter = q.execute().getNodes();
- int count = 0;
- while ( iter.hasNext() ) {
- final Node eventNode = iter.nextNode();
- eventNode.remove();
- count++;
+ return buffer.toString();
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+ */
+ protected void processWriteQueue() {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ EventInfo info = null;
+ try {
+ info = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
}
- this.logger.debug("Removed {} event nodes from the repository.", count);
- parentNode.save();
- } catch (RepositoryException e) {
- // in the case of an error, we just log this as a warning
- this.logger.warn("Exception during repository cleanup.", e);
- } finally {
- if ( s != null ) {
- s.logout();
+ if ( info != null && this.running ) {
+ try {
+ final Node eventNode = this.writeEvent(info.event);
+ info.nodePath = eventNode.getPath();
+ } catch (Exception e) {
+ this.logger.error("Exception during writing the event to the repository.", e);
+ }
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
}
}
}
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+ */
protected void runInBackground() {
while ( this.running ) {
// so let's wait/get the next job from the queue
@@ -102,15 +106,11 @@
this.ignoreException(e);
}
if ( info != null && this.running ) {
- if ( info.event != null ) {
- try {
- this.writeEvent(info.event);
- } catch (Exception e) {
- this.logger.error("Exception during writing the event to the repository.", e);
- }
- } else if ( info.nodePath != null) {
+ if ( info.nodePath != null) {
+ Session session = null;
try {
- final Node eventNode = (Node) this.session.getItem(info.nodePath);
+ session = this.createSession();
+ final Node eventNode = (Node)session.getItem(info.nodePath);
final EventAdmin localEA = this.eventAdmin;
if ( localEA != null ) {
localEA.postEvent(this.readEvent(eventNode));
@@ -119,6 +119,10 @@
}
} catch (Exception ex) {
this.logger.error("Exception during reading the event from the repository.", ex);
+ } finally {
+ if ( session != null ) {
+ session.logout();
+ }
}
}
}
@@ -132,7 +136,7 @@
try {
final EventInfo info = new EventInfo();
info.event = event;
- this.queue.put(info);
+ this.writeQueue.put(info);
} catch (InterruptedException ex) {
// we ignore this
this.ignoreException(ex);
@@ -169,13 +173,11 @@
/**
- * Start the repository session and add this handler as an observer
- * for new events created on other nodes.
- * @throws RepositoryException
- */
- protected void startSession() throws RepositoryException {
- super.startSession();
- this.session.getWorkspace().getObservationManager()
- .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+ */
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
+ this.writerSession.getWorkspace().getObservationManager()
+ .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, new String[] {this.getEventNodeType()}, true);
}
}
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Mon Feb 11 07:19:29 2008
@@ -22,9 +22,13 @@
import java.util.Calendar;
import java.util.Collection;
import java.util.Dictionary;
+import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.jcr.ItemExistsException;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
@@ -36,7 +40,6 @@
import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.util.ISO8601;
-import org.apache.jackrabbit.util.Locked;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
import org.osgi.service.component.ComponentContext;
@@ -47,20 +50,23 @@
/**
* An event handler handling special job events.
*
- * @scr.component inherit="true"
+ * @scr.component
* @scr.service interface="org.apache.sling.event.JobStatusProvider"
- * @scr.property name="event.topics" value="org/apache/sling/event/job"
+ * @scr.property name="event.topics" valueRef="EventUtil.TOPIC_JOB"
* @scr.property name="repository.path" value="/sling/jobs"
*/
public class JobEventHandler
extends AbstractRepositoryEventHandler
implements EventUtil.JobStatusNotifier, JobStatusProvider {
- /** @scr.property value="20" type="Long" */
- protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
+ /** A map for keeping track of currently processed job topics. */
+ protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+
+ /** Default sleep time. */
+ protected static final long DEFAULT_SLEEP_TIME = 20;
- /** A flag indicating if this handler is currently processing a job. */
- protected boolean isProcessing = false;
+ /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
+ protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
/** We check every 20 secs by default. */
protected long sleepTime;
@@ -72,69 +78,120 @@
*/
protected void activate(final ComponentContext context)
throws RepositoryException {
- this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) * 1000;
-
+ if ( context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) != null ) {
+ this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) * 1000;
+ } else {
+ this.sleepTime = DEFAULT_SLEEP_TIME;
+ }
super.activate(context);
}
/**
- * Clean up the repository.
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
*/
- protected void cleanUpRepository() {
- this.logger.debug("Cleaning up repository, removing all jobs older than {} minutes.", this.cleanupPeriod);
+ protected void processWriteQueue() {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ EventInfo info = null;
+ try {
+ info = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( info != null && this.running ) {
+ final Event event = info.event;
+ final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
- // we create an own session for concurrency issues
- Session s = null;
- try {
- s = this.createSession();
- final Node parentNode = (Node)s.getItem(this.repositoryPath);
- final Calendar deleteBefore = Calendar.getInstance();
- deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
- final String dateString = ISO8601.format(deleteBefore);
- new Locked() {
-
- protected Object run(Node node) throws RepositoryException {
- final QueryManager qManager = node.getSession().getWorkspace().getQueryManager();
- final StringBuffer buffer = new StringBuffer("/jcr:root");
- buffer.append(JobEventHandler.this.repositoryPath);
- buffer.append("//element(*, ");
- buffer.append(JobEventHandler.this.getEventNodeType());
- buffer.append(") [@");
- buffer.append(EventHelper.NODE_PROPERTY_ACTIVE);
- buffer.append(" = 'false' and @");
- buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
- buffer.append(" < xs:dateTime('");
- buffer.append(dateString);
- buffer.append("')]");
-
- logger.debug("Executing query {}", buffer);
- final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
- final NodeIterator iter = q.execute().getNodes();
- int count = 0;
- while ( iter.hasNext() ) {
- final Node eventNode = iter.nextNode();
- eventNode.remove();
- count++;
+ // if the job has no job id, we can just write the job to the repo and don't
+ // need locking
+ if ( jobId == null ) {
+ try {
+ final Node eventNode = this.writeEvent(event);
+ info.nodePath = eventNode.getPath();
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job to repository.", re);
}
- parentNode.save();
- logger.debug("Removed {} job nodes from the repository.", count);
- return null;
- }
- }.with(parentNode, false);
- } catch (RepositoryException e) {
- // in the case of an error, we just log this as a warning
- this.logger.warn("Exception during repository cleanup.", e);
- } catch (InterruptedException e) {
- // we ignore this
- this.ignoreException(e);
- } finally {
- if ( s != null ) {
- s.logout();
+ } else {
+ try {
+ // let's first search for an existing node with the same id
+ final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
+ final String nodeName = this.getNodeName(event);
+ Node foundNode = null;
+ if ( parentNode.hasNode(nodeName) ) {
+ foundNode = parentNode.getNode(nodeName);
+ }
+ if ( foundNode != null ) {
+ // if the node is locked, someone else was quicker
+ // and we don't have to process this job
+ if ( foundNode.isLocked() ) {
+ foundNode = null;
+ } else {
+ // node is already in repository, so we just overwrite it
+ try {
+ foundNode.remove();
+ parentNode.save();
+ } catch (RepositoryException re) {
+ // if anything goes wrong, it means that (hopefully) someone
+ // else is processing this node
+ foundNode = null;
+ }
+
+ }
+ }
+ if ( foundNode == null ) {
+ // We now write the event into the repository
+ try {
+ final Node eventNode = this.writeEvent(event);
+ info.nodePath = eventNode.getPath();
+ } catch (ItemExistsException iee) {
+ // someone else did already write this node in the meantime
+ // nothing to do for us
+ }
+ }
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job to repository.", re);
+ }
+ }
+ // if we were able to write the event into the repository
+ // we will queue it for processing
+ if ( info.nodePath != null ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
}
}
}
/**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getCleanUpQueryString()
+ */
+ protected String getCleanUpQueryString() {
+ final Calendar deleteBefore = Calendar.getInstance();
+ deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+ final String dateString = ISO8601.format(deleteBefore);
+ final StringBuffer buffer = new StringBuffer("/jcr:root");
+ buffer.append(JobEventHandler.this.repositoryPath);
+ buffer.append("//element(*, ");
+ buffer.append(JobEventHandler.this.getEventNodeType());
+ buffer.append(") [@");
+ buffer.append(EventHelper.NODE_PROPERTY_ACTIVE);
+ buffer.append(" = 'false' and @");
+ buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
+ buffer.append(" < xs:dateTime('");
+ buffer.append(dateString);
+ buffer.append("')]");
+
+ return buffer.toString();
+ }
+
+ /**
* This method runs in the background and processes the local queue.
*/
protected void runInBackground() {
@@ -147,76 +204,94 @@
// we ignore this
this.ignoreException(e);
}
+
if ( info != null && this.running ) {
- if ( info.nodePath == null ) {
- this.processEvent(info.event);
- } else {
- boolean process = false;
- synchronized ( this ) {
- if ( !this.isProcessing ) {
- this.isProcessing = true;
+ final Event event = info.event;
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+ // check how we can process this job
+ // if parallel processing is allowed, we can just process
+ // if not we should check if any other job with the same topic is currently running
+ boolean process = parallelProcessing;
+ if ( !process ) {
+ synchronized ( this.processingMap ) {
+ final Boolean value = this.processingMap.get(jobTopic);
+ if ( value == null || !value.booleanValue() ) {
+ this.processingMap.put(jobTopic, Boolean.TRUE);
process = true;
}
}
- if ( process ) {
- boolean unlock = true;
- try {
- this.session.refresh(true);
- final Node eventNode = (Node) this.session.getItem(info.nodePath);
- if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
- // lock node
- Lock lock = null;
- try {
- lock = eventNode.lock(false, true);
- } catch (RepositoryException re) {
- // lock failed which means that the node is locked by someone else, so we don't have to requeue
- process = false;
- }
- if ( process ) {
- // check if event is still active
- eventNode.refresh(true);
- if ( eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
- unlock = false;
- this.processJob(info.event, eventNode, lock.getLockToken());
- } else {
- eventNode.unlock();
- }
- }
+
+ }
+ if ( process ) {
+ boolean unlock = true;
+ Session session = null;
+ try {
+ session = this.createSession();
+ final Node eventNode = (Node) session.getItem(info.nodePath);
+ if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+ // lock node
+ Lock lock = null;
+ try {
+ lock = eventNode.lock(false, true);
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
+ process = false;
}
- } catch (RepositoryException e) {
- // ignore
- this.ignoreException(e);
- } finally {
- if ( unlock ) {
- synchronized ( this ) {
- this.isProcessing = false;
+ if ( process ) {
+ // check if event is still active
+ eventNode.refresh(true);
+ if ( eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+ unlock = false;
+ this.processJob(info.event, eventNode, lock.getLockToken());
+ } else {
+ eventNode.unlock();
}
}
}
- } else {
- try {
- // check if the node is in processing or already finished
- final Node eventNode = (Node) this.session.getItem(info.nodePath);
- if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ } finally {
+ if ( unlock && !parallelProcessing ) {
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
+ }
+ }
+ if ( session != null ) {
+ session.logout();
+ }
+ }
+ } else {
+ Session session = null;
+ try {
+ session = this.createSession();
+ // check if the node is in processing or already finished
+ final Node eventNode = (Node) session.getItem(info.nodePath);
+ if ( !eventNode.isLocked() && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // ignore
+ this.ignoreException(e);
+ }
+ // wait time before we restart the cycle, if there is only one job in the queue!
+ if ( this.queue.size() == 1 ) {
try {
- this.queue.put(info);
+ Thread.sleep(this.sleepTime);
} catch (InterruptedException e) {
// ignore
this.ignoreException(e);
}
- // wait time before we restart the cycle, if there is only one job in the queue!
- if ( this.queue.size() == 1 ) {
- try {
- Thread.sleep(this.sleepTime);
- } catch (InterruptedException e) {
- // ignore
- this.ignoreException(e);
- }
- }
}
- } catch (RepositoryException e) {
- // ignore
- this.ignoreException(e);
+ }
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ } finally {
+ if ( session != null ) {
+ session.logout();
}
}
}
@@ -229,12 +304,18 @@
* for new events created on other nodes.
* @throws RepositoryException
*/
- protected void startSession() throws RepositoryException {
- super.startSession();
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
// load unprocessed jobs from repository
this.loadJobs();
- this.session.getWorkspace().getObservationManager()
- .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED, this.repositoryPath, true, null, null, true);
+ this.writerSession.getWorkspace().getObservationManager()
+ .addEventListener(this,
+ javax.jcr.observation.Event.PROPERTY_CHANGED | javax.jcr.observation.Event.PROPERTY_REMOVED,
+ this.repositoryPath,
+ true,
+ null,
+ new String[] {this.getEventNodeType()},
+ true);
}
/**
@@ -265,7 +346,7 @@
final EventInfo info = new EventInfo();
info.event = event;
try {
- this.queue.put(info);
+ this.writeQueue.put(info);
} catch (InterruptedException e) {
// this should never happen
this.ignoreException(e);
@@ -276,102 +357,16 @@
}
}
- protected void processEvent(final Event event) {
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getNodeName(org.osgi.service.event.Event)
+ */
+ protected String getNodeName(Event event) {
final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
- final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- Lock lock = null;
- // if the job has no job id, we can just write the job to the repo and don't
- // need locking
- if ( jobId == null ) {
- try {
- final Node eventNode = JobEventHandler.this.writeEvent(event);
- lock = eventNode.lock(false, true);
- } catch (RepositoryException re ) {
- // something went wrong, so let's log it
- this.logger.error("Exception during writing new job to repository.", re);
- }
- } else {
- // we lock the parent node to ensure that noone else tries to add the same job to the repository
- // while we are doing it
- try {
- final Node parentNode = (Node)this.session.getItem(this.repositoryPath);
- lock = (Lock) new Locked() {
-
- protected Object run(Node node) throws RepositoryException {
- // if there is a node, we know that there is exactly one node
- final Node foundNode = JobEventHandler.this.queryJob(jobTopic, jobId);
- boolean writeAndSend =false;
- // if node is not present, we'll write it, lock it and send the event
- if ( foundNode == null ) {
- writeAndSend = true;
- } else {
- // node is already in repository, let's check the application id
- if ( foundNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString().equals(JobEventHandler.this.applicationId) ) {
- // delete old node (deleting is easier than updating...)
- foundNode.remove();
- parentNode.save();
- writeAndSend = true;
- }
- }
- if ( writeAndSend ) {
- final Node eventNode = JobEventHandler.this.writeEvent(event);
- return eventNode.lock(false, true);
- }
- return null;
- }
- }.with(parentNode, false);
- } catch (RepositoryException re ) {
- // something went wrong, so let's log it
- this.logger.error("Exception during writing new job to repository.", re);
- } catch (InterruptedException e) {
- // This should never happen from the lock, so we ignore it
- this.ignoreException(e);
- }
+ if ( jobId != null ) {
+ return jobId.replace('/', '.');
}
- // if we have a lock, we will try to fire the job
- if ( lock != null ) {
- final Node eventNode = lock.getNode();
- final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
-
- if ( parallelProcessing ) {
- // if the job can be run in parallel, we'll just run it
- this.processJob(event, eventNode, lock.getLockToken());
- } else {
- // we need to serialize the jobs
- // lets check if we are currently processing a job
- boolean process = false;
- synchronized ( this ) {
- if ( !this.isProcessing ) {
- this.isProcessing = true;
- process = true;
- }
- }
- if ( process ) {
- this.processJob(event, eventNode, lock.getLockToken());
- } else {
- // we don't process the job right now, so unlock and put in local queue
- final EventInfo info = new EventInfo();
- info.event = event;
- try {
- info.nodePath = eventNode.getPath();
- this.queue.put(info);
- } catch (RepositoryException e) {
- // getPath() should work at this stage, so we ignore it
- this.ignoreException(e);
- } catch (InterruptedException e) {
- // this should never happen so we ignore it
- this.ignoreException(e);
- }
- try {
- eventNode.unlock();
- } catch (RepositoryException e) {
- // if unlocking fails, we will just ignore it
- this.ignoreException(e);
- }
- }
- }
- }
+ return "Job " + UUID.randomUUID().toString();
}
/**
@@ -381,6 +376,7 @@
*/
protected void processJob(Event event, Node eventNode, String lockToken) {
final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
boolean unlock = true;
try {
final Event jobEvent = this.getJobEvent(event, eventNode, lockToken);
@@ -400,8 +396,8 @@
} finally {
if ( unlock ) {
if ( !parallelProcessing ) {
- synchronized ( this ) {
- this.isProcessing = false;
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
}
}
// unlock node
@@ -457,20 +453,29 @@
s = this.createSession();
while ( iter.hasNext() ) {
final javax.jcr.observation.Event event = iter.nextEvent();
- if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED ) {
+ if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+ || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
try {
- final Node eventNode = (Node) s.getItem(event.getPath());
- if ( !eventNode.isLocked()
- && eventNode.hasProperty(EventHelper.NODE_PROPERTY_ACTIVE)
- && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
- final EventInfo info = new EventInfo();
- info.event = this.readEvent(eventNode);
- info.nodePath = event.getPath();
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // we ignore this exception as this should never occur
- this.ignoreException(e);
+ final String propPath = event.getPath();
+ int pos = propPath.lastIndexOf('/');
+ final String nodePath = propPath.substring(0, pos);
+ final String propertyName = propPath.substring(pos+1);
+
+ // we are only interested in unlocks
+ if ( "jcr:lockOwner".equals(propertyName) ) {
+ final Node eventNode = (Node) s.getItem(nodePath);
+ if ( !eventNode.isLocked()
+ && eventNode.hasProperty(EventHelper.NODE_PROPERTY_ACTIVE)
+ && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
+ final EventInfo info = new EventInfo();
+ info.event = this.readEvent(eventNode);
+ info.nodePath = event.getPath();
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ this.ignoreException(e);
+ }
}
}
} catch (RepositoryException re) {
@@ -488,42 +493,11 @@
}
/**
- * Search for a node with the corresponding topic and unique key.
- * @param topic
- * @param key
- * @return The node or null.
- * @throws RepositoryException
- */
- protected Node queryJob(String topic, String key) throws RepositoryException {
- final QueryManager qManager = this.session.getWorkspace().getQueryManager();
- final StringBuffer buffer = new StringBuffer("/jcr:root");
- buffer.append(this.repositoryPath);
- buffer.append("//element(*, ");
- buffer.append(this.getEventNodeType());
- buffer.append(") [");
- buffer.append(EventHelper.NODE_PROPERTY_TOPIC);
- buffer.append(" = '");
- buffer.append(topic);
- buffer.append("' and ");
- buffer.append(EventHelper.NODE_PROPERTY_JOBID);
- buffer.append(" = '");
- buffer.append(key);
- buffer.append("']");
- final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
- final NodeIterator result = q.execute().getNodes();
- Node foundNode = null;
- if ( result.hasNext() ) {
- foundNode = result.nextNode();
- }
- return foundNode;
- }
-
- /**
* Load all active jobs from the repository.
* @throws RepositoryException
*/
protected void loadJobs() throws RepositoryException {
- final QueryManager qManager = this.session.getWorkspace().getQueryManager();
+ final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
final StringBuffer buffer = new StringBuffer("/jcr:root");
buffer.append(this.repositoryPath);
buffer.append("//element(*, ");
@@ -553,13 +527,38 @@
/**
* @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, String, boolean)
*/
- public void finishedJob(Event job, String eventNodePath, String lockToken, boolean reschedule) {
+ public boolean finishedJob(Event job, String eventNodePath, String lockToken, boolean shouldReschedule) {
+ boolean reschedule = shouldReschedule;
+ if ( shouldReschedule ) {
+ // check if we exceeded the number of retries
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
+ int retries = (Integer) job.getProperty(EventUtil.PROPERTY_JOB_RETRIES);
+ int retryCount = 0;
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retryCount = (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
+ }
+ retryCount++;
+ if ( retryCount >= retries ) {
+ reschedule = false;
+ }
+ // update event with retry count
+ final Dictionary<String, Object> newProperties;
+ // create a new dictionary
+ newProperties = new Hashtable<String, Object>();
+ final String[] names = job.getPropertyNames();
+ for(int i=0; i<names.length; i++ ) {
+ newProperties.put(names[i], job.getProperty(names[i]));
+ }
+ newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+ job = new Event(job.getTopic(), newProperties);
+ }
+ }
final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
Session s = null;
try {
s = this.createSession();
// remove lock token from shared session and add it to current session
- this.session.removeLockToken(lockToken);
+ this.writerSession.removeLockToken(lockToken);
s.addLockToken(lockToken);
final Node eventNode = (Node) s.getItem(eventNodePath);
try {
@@ -573,8 +572,9 @@
this.logger.error("Exception during job finishing.", re);
} finally {
if ( !parallelProcessing) {
- synchronized ( this ) {
- this.isProcessing = false;
+ final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
}
}
// unlock node
@@ -590,17 +590,47 @@
try {
info.event = job;
info.nodePath = eventNode.getPath();
- this.queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
} catch (RepositoryException e) {
// this should never happen
this.ignoreException(e);
}
+ // delay rescheduling?
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Thread t = new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
+ }
+ };
+ t.start();
+ } else {
+ // put directly into queue
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ }
+ if ( !shouldReschedule ) {
+ return true;
}
+ return reschedule;
} catch (RepositoryException re) {
this.logger.error("Unable to create new session.", re);
+ return false;
} finally {
if ( s != null ) {
s.logout();
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java Mon Feb 11 07:19:29 2008
@@ -67,22 +67,28 @@
protected Scheduler scheduler;
/**
- * Start the repository session and add this handler as an observer
- * for new events created on other nodes.
- * @throws RepositoryException
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
*/
- protected void startSession() throws RepositoryException {
- super.startSession();
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
// load timed events from repository
this.loadEvents();
- this.session.getWorkspace().getObservationManager()
+ this.writerSession.getWorkspace().getObservationManager()
.addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED, this.repositoryPath, true, null, null, true);
}
/**
- * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#cleanUpRepository()
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getCleanUpQueryString()
+ */
+ protected String getCleanUpQueryString() {
+ // nothing to clean up for now
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
*/
- protected void cleanUpRepository() {
+ protected void processWriteQueue() {
// nothing to do right now
}
@@ -122,8 +128,8 @@
}
}
} else {
- this.session.refresh(true);
- final Node eventNode = (Node) this.session.getItem(info.nodePath);
+ this.writerSession.refresh(true);
+ final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
if ( !eventNode.isLocked() ) {
// lock node
Lock lock = null;
@@ -156,13 +162,13 @@
protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
try {
- final Node parentNode = (Node)this.session.getItem(this.repositoryPath);
+ final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
Lock lock = (Lock) new Locked() {
protected Object run(Node node) throws RepositoryException {
final String jobId = scheduleInfo.jobId;
// if there is a node, we know that there is exactly one node
- final Node foundNode = queryJob(session, jobId);
+ final Node foundNode = queryJob(writerSession, jobId);
if ( scheduleInfo.isStopEvent() ) {
// if this is a stop event, we should remove the node from the repository
// if there is no node someone else was faster and we can ignore this
@@ -408,7 +414,7 @@
* @throws RepositoryException
*/
protected void loadEvents() throws RepositoryException {
- final QueryManager qManager = this.session.getWorkspace().getQueryManager();
+ final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
final StringBuffer buffer = new StringBuffer("/jcr:root");
buffer.append(this.repositoryPath);
buffer.append("//element(*, ");
Modified: incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=620503&r1=620502&r2=620503&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Mon Feb 11 07:19:29 2008
@@ -126,8 +126,8 @@
assertEquals(this.handler.applicationId, SLING_ID);
assertEquals(this.handler.cleanupPeriod, CLEANUP_PERIOD);
assertEquals(this.handler.repositoryPath, REPO_PATH);
- assertNotNull(this.handler.session);
- final EventListenerIterator iter = this.handler.session.getWorkspace().getObservationManager().getRegisteredEventListeners();
+ assertNotNull(this.handler.writerSession);
+ final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
boolean found = false;
while ( !found && iter.hasNext() ) {
final javax.jcr.observation.EventListener listener = iter.nextEventListener();