You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/02/14 10:25:14 UTC
svn commit: r627686 -
/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Author: cziegeler
Date: Thu Feb 14 01:25:13 2008
New Revision: 627686
URL: http://svn.apache.org/viewvc?rev=627686&view=rev
Log:
Synchronize on session
Modified:
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=627686&r1=627685&r2=627686&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Thu Feb 14 01:25:13 2008
@@ -224,82 +224,86 @@
if ( info != null && this.running ) {
// check if the node still exists
- try {
- if ( this.backgroundSession.itemExists(info.nodePath) ) {
- final Event event = info.event;
- final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
-
- // check how we can process this job
- // if parallel processing is allowed, we can just process
- // if not we should check if any other job with the same topic is currently running
- boolean process = parallelProcessing;
- if ( !process ) {
- synchronized ( this.processingMap ) {
- final Boolean value = this.processingMap.get(jobTopic);
- if ( value == null || !value.booleanValue() ) {
- this.processingMap.put(jobTopic, Boolean.TRUE);
- process = true;
+ synchronized (this.backgroundSession) {
+ try {
+ this.backgroundSession.refresh(false);
+ if ( this.backgroundSession.itemExists(info.nodePath) ) {
+ final Event event = info.event;
+ final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+ // check how we can process this job
+ // if parallel processing is allowed, we can just process
+ // if not we should check if any other job with the same topic is currently running
+ boolean process = parallelProcessing;
+ if ( !process ) {
+ synchronized ( this.processingMap ) {
+ final Boolean value = this.processingMap.get(jobTopic);
+ if ( value == null || !value.booleanValue() ) {
+ this.processingMap.put(jobTopic, Boolean.TRUE);
+ process = true;
+ }
}
- }
- }
- if ( process ) {
- boolean unlock = true;
- try {
- final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
- if ( !eventNode.isLocked() ) {
- // lock node
- try {
- eventNode.lock(false, true);
- } catch (RepositoryException re) {
- // lock failed which means that the node is locked by someone else, so we don't have to requeue
- process = false;
- }
- if ( process ) {
- unlock = false;
- this.processJob(info.event, eventNode);
+ }
+ if ( process ) {
+ boolean unlock = true;
+ try {
+ final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() ) {
+ // lock node
+ try {
+ eventNode.lock(false, true);
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
+ process = false;
+ }
+ if ( process ) {
+ unlock = false;
+ this.processJob(info.event, eventNode);
+ }
}
- }
- } catch (RepositoryException e) {
- // ignore
- this.ignoreException(e);
- } finally {
- if ( unlock && !parallelProcessing ) {
- synchronized ( this.processingMap ) {
- this.processingMap.put(jobTopic, Boolean.FALSE);
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ } finally {
+ if ( unlock && !parallelProcessing ) {
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
+ }
}
}
- }
- } else {
- try {
- // check if the node is in processing or already finished
- final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
- if ( !eventNode.isLocked() ) {
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // ignore
- this.ignoreException(e);
- }
- // wait time before we restart the cycle, if there is only one job in the queue!
- if ( this.queue.size() == 1 ) {
+ } else {
+ try {
+ // check if the node is in processing or already finished
+ final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() ) {
try {
- Thread.sleep(this.sleepTime);
+ this.queue.put(info);
} catch (InterruptedException e) {
// ignore
this.ignoreException(e);
}
+ // wait time before we restart the cycle, if there is only one job in the queue!
+ if ( this.queue.size() == 1 ) {
+ try {
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ // ignore
+ this.ignoreException(e);
+ }
+ }
}
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
}
- } catch (RepositoryException e) {
- // ignore
- this.ignoreException(e);
}
}
+ } catch (RepositoryException re) {
+ this.ignoreException(re);
}
- } catch (RepositoryException re) {
- this.ignoreException(re);
+
}
}
}
@@ -432,7 +436,7 @@
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
boolean unlock = true;
try {
- final Event jobEvent = this.getJobEvent(event, eventNode);
+ final Event jobEvent = this.getJobEvent(event, eventNode.getPath());
eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
eventNode.save();
final EventAdmin localEA = this.eventAdmin;
@@ -469,8 +473,7 @@
* @param e
* @return
*/
- protected Event getJobEvent(Event e, Node eventNode)
- throws RepositoryException {
+ protected Event getJobEvent(Event e, String nodePath) {
final String eventTopic = (String)e.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
final Dictionary<String, Object> properties = new Hashtable<String, Object>();
final String[] propertyNames = e.getPropertyNames();
@@ -478,7 +481,7 @@
properties.put(propertyNames[i], e.getProperty(propertyNames[i]));
}
// put properties for finished job callback
- properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME, new EventUtil.JobStatusNotifier.NotifierContext(this, eventNode.getPath()));
+ properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME, new EventUtil.JobStatusNotifier.NotifierContext(this, nodePath));
return new Event(eventTopic, properties);
}
@@ -632,97 +635,99 @@
}
final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
// we have to use the same session for unlocking that we used for locking!
- // TODO - we have to bring the session into the same thread!
- try {
- final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
- boolean unlock = true;
+ synchronized ( this.backgroundSession ) {
try {
- if ( !reschedule ) {
- synchronized ( this.deletedJobs ) {
- this.deletedJobs.add(eventNodePath);
+ this.backgroundSession.refresh(false);
+ final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
+ boolean unlock = true;
+ try {
+ if ( !reschedule ) {
+ synchronized ( this.deletedJobs ) {
+ this.deletedJobs.add(eventNodePath);
+ }
+ // unlock node
+ try {
+ eventNode.unlock();
+ } catch (RepositoryException e) {
+ // if unlock fails, we silently ignore this
+ this.ignoreException(e);
+ }
+ unlock = false;
+ // remove node from repository
+ final Node parentNode = eventNode.getParent();
+ eventNode.remove();
+ parentNode.save();
}
- // unlock node
- try {
- eventNode.unlock();
- } catch (RepositoryException e) {
- // if unlock fails, we silently ignore this
- this.ignoreException(e);
+ } catch (RepositoryException re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job finishing.", re);
+ } finally {
+ if ( !parallelProcessing) {
+ final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ synchronized ( this.processingMap ) {
+ this.processingMap.put(jobTopic, Boolean.FALSE);
+ }
}
- unlock = false;
- // remove node from repository
- final Node parentNode = eventNode.getParent();
- eventNode.remove();
- parentNode.save();
- }
- } catch (RepositoryException re) {
- // if an exception occurs, we just log
- this.logger.error("Exception during job finishing.", re);
- } finally {
- if ( !parallelProcessing) {
- final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- synchronized ( this.processingMap ) {
- this.processingMap.put(jobTopic, Boolean.FALSE);
+ if ( unlock ) {
+ synchronized ( this.deletedJobs ) {
+ this.deletedJobs.add(eventNodePath);
+ }
+ // unlock node
+ try {
+ eventNode.unlock();
+ } catch (RepositoryException e) {
+ // if unlock fails, we silently ignore this
+ this.ignoreException(e);
+ }
}
}
- if ( unlock ) {
- synchronized ( this.deletedJobs ) {
- this.deletedJobs.add(eventNodePath);
- }
- // unlock node
+ if ( reschedule ) {
+ final EventInfo info = new EventInfo();
try {
- eventNode.unlock();
+ info.event = job;
+ info.nodePath = eventNode.getPath();
} catch (RepositoryException e) {
- // if unlock fails, we silently ignore this
+ // this should never happen
this.ignoreException(e);
}
- }
- }
- if ( reschedule ) {
- final EventInfo info = new EventInfo();
- try {
- info.event = job;
- info.nodePath = eventNode.getPath();
- } catch (RepositoryException e) {
- // this should never happen
- this.ignoreException(e);
- }
- // delay rescheduling?
- if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
- final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
- final Thread t = new Thread() {
- public void run() {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- // this should never happen
- ignoreException(e);
- }
- try {
- queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- ignoreException(e);
+ // delay rescheduling?
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Thread t = new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
}
+ };
+ t.start();
+ } else {
+ // put directly into queue
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
}
- };
- t.start();
- } else {
- // put directly into queue
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
}
}
+ if ( !shouldReschedule ) {
+ return true;
+ }
+ return reschedule;
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create new session.", re);
+ return false;
}
- if ( !shouldReschedule ) {
- return true;
- }
- return reschedule;
- } catch (RepositoryException re) {
- this.logger.error("Unable to create new session.", re);
- return false;
}
}