You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/10/01 11:51:04 UTC
svn commit: r700723 - in
/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl:
EventHelper.java JobEventHandler.java
Author: cziegeler
Date: Wed Oct 1 02:51:03 2008
New Revision: 700723
URL: http://svn.apache.org/viewvc?rev=700723&view=rev
Log:
Clean up eventing code
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java?rev=700723&r1=700722&r2=700723&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java Wed Oct 1 02:51:03 2008
@@ -41,11 +41,8 @@
public static final String NODE_PROPERTY_TE_DATE = "slingevent:date";
public static final String NODE_PROPERTY_TE_PERIOD = "slingevent:period";
- public static final String EVENTS_NODE_TYPE = "slingevent:Events";
public static final String EVENT_NODE_TYPE = "slingevent:Event";
- public static final String JOBS_NODE_TYPE = "slingevent:Jobs";
public static final String JOB_NODE_TYPE = "slingevent:Job";
- public static final String TIMED_EVENTS_NODE_TYPE = "slingevent:TimedEvents";
public static final String TIMED_EVENT_NODE_TYPE = "slingevent:TimedEvent";
/** The nodetype for newly created folders */
Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=700723&r1=700722&r2=700723&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Oct 1 02:51:03 2008
@@ -51,6 +51,7 @@
import org.apache.sling.event.EventPropertiesMap;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
+import org.osgi.service.component.ComponentConstants;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -59,7 +60,7 @@
/**
* An event handler for special job events.
*
- * @scr.component label="%job.events.name" description="%job.events.description"
+ * @scr.component label="%job.events.name" description="%job.events.description" immediate="true"
* @scr.service interface="org.apache.sling.event.JobStatusProvider"
* @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB"
* values.updated="org/osgi/framework/BundleEvent/UPDATED"
@@ -121,6 +122,9 @@
/** The scheduler for rescheduling jobs. @scr.reference */
private Scheduler scheduler;
+ /** Our component context. */
+ private ComponentContext componentContext;
+
public static ThreadPool JOB_THREAD_POOL;
/**
@@ -135,6 +139,7 @@
this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
+ this.componentContext = context;
super.activate(context);
JOB_THREAD_POOL = this.threadPool;
}
@@ -164,6 +169,7 @@
this.backgroundSession.logout();
this.backgroundSession = null;
}
+ this.componentContext = null;
JOB_THREAD_POOL = null;
}
@@ -238,6 +244,7 @@
this.ignoreException(e);
}
if ( event != null && this.running ) {
+ logger.debug("Persisting job {}", event);
try {
this.writerSession.refresh(false);
} catch (RepositoryException re) {
@@ -335,6 +342,15 @@
// load unprocessed jobs from repository
if ( this.running ) {
this.loadJobs();
+ } else {
+ logger.info("Deactivating component due to errors.");
+ // deactivate
+ final ComponentContext ctx = this.componentContext;
+ if ( ctx != null ) {
+ final String name = (String) componentContext.getProperties().get(
+ ComponentConstants.COMPONENT_NAME);
+ ctx.disableComponent(name);
+ }
}
while ( this.running ) {
// so let's wait/get the next job from the queue
@@ -489,10 +505,11 @@
* Process a job
*/
private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
- // check if the node still exists
+ boolean putback = false;
synchronized (this.backgroundSession) {
try {
this.backgroundSession.refresh(false);
+ // check if the node still exists
if ( this.backgroundSession.itemExists(info.nodePath)
&& !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
final Event event = info.event;
@@ -547,41 +564,7 @@
// check if the node is in processing or already finished
final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
- final EventInfo eInfo = info;
- final Date fireDate = new Date();
- fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
-
- // we put it back into the queue after a specific time
- final Runnable r = new Runnable() {
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run() {
- try {
- queue.put(eInfo);
- } catch (InterruptedException e) {
- // ignore
- ignoreException(e);
- }
- }
-
- };
- try {
- this.scheduler.fireJobAt(null, r, null, fireDate);
- } catch (Exception e) {
- // we ignore the exception
- ignoreException(e);
- // then wait for the time and readd the job
- try {
- Thread.sleep(sleepTime * 1000);
- } catch (InterruptedException ie) {
- // ignore
- ignoreException(ie);
- }
- r.run();
- }
-
+ putback = true;
}
} catch (RepositoryException e) {
// ignore
@@ -594,6 +577,42 @@
}
}
+ if ( putback ) {
+ final EventInfo eInfo = info;
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
+
+ // we put it back into the queue after a specific time
+ final Runnable r = new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ try {
+ queue.put(eInfo);
+ } catch (InterruptedException e) {
+ // ignore
+ ignoreException(e);
+ }
+ }
+
+ };
+ try {
+ this.scheduler.fireJobAt(null, r, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception
+ ignoreException(e);
+ // then wait for the time and readd the job
+ try {
+ Thread.sleep(sleepTime * 1000);
+ } catch (InterruptedException ie) {
+ // ignore
+ ignoreException(ie);
+ }
+ r.run();
+ }
+ }
return false;
}
@@ -608,10 +627,12 @@
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
public void handleEvent(final Event event) {
+ logger.debug("Receiving event {}", event);
// we ignore remote job events
if ( EventUtil.isLocal(event) ) {
// check for bundle event
if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
+ logger.debug("Handling local job {}", event);
// job event
final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
@@ -873,7 +894,9 @@
buffer.append(this.repositoryPath);
buffer.append("//element(*, ");
buffer.append(this.getEventNodeType());
- buffer.append(")");
+ buffer.append(") order by @");
+ buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+ buffer.append(" ascending");
final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
final NodeIterator result = q.execute().getNodes();
while ( result.hasNext() ) {
@@ -938,6 +961,7 @@
}
final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
|| job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+ EventInfo putback = null;
// we have to use the same session for unlocking that we used for locking!
synchronized ( this.backgroundSession ) {
try {
@@ -1028,27 +1052,7 @@
// delay rescheduling?
if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
- final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
- final Date fireDate = new Date();
- fireDate.setTime(System.currentTimeMillis() + delay);
-
- final Runnable t = new Runnable() {
- public void run() {
- try {
- queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- ignoreException(e);
- }
- }
- };
- try {
- this.scheduler.fireJobAt(null, t, null, fireDate);
- } catch (Exception e) {
- // we ignore the exception and just put back the job in the queue
- ignoreException(e);
- t.run();
- }
+ putback = info;
} else {
// put directly into queue
try {
@@ -1073,15 +1077,39 @@
}
}
}
- if ( !shouldReschedule ) {
- return true;
- }
- return reschedule;
} catch (RepositoryException re) {
this.logger.error("Unable to create new session.", re);
return false;
}
}
+ if ( putback != null ) {
+ final EventInfo info = putback;
+ final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final Runnable t = new Runnable() {
+ public void run() {
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
+ }
+ };
+ try {
+ this.scheduler.fireJobAt(null, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ }
+ if ( !shouldReschedule ) {
+ return true;
+ }
+ return reschedule;
}
/**