You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/02/13 16:20:03 UTC
svn commit: r627463 - in /incubator/sling/trunk/sling/event: ./
src/main/java/org/apache/sling/event/impl/
src/test/java/org/apache/sling/event/impl/
Author: cziegeler
Date: Wed Feb 13 07:20:00 2008
New Revision: 627463
URL: http://svn.apache.org/viewvc?rev=627463&view=rev
Log:
SLING-177: Remove the use of the Locked class completly, refactor code.
Added:
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
- copied, changed from r627395, incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
Removed:
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
Modified:
incubator/sling/trunk/sling/event/pom.xml
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
Modified: incubator/sling/trunk/sling/event/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/pom.xml?rev=627463&r1=627462&r2=627463&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/pom.xml (original)
+++ incubator/sling/trunk/sling/event/pom.xml Wed Feb 13 07:20:00 2008
@@ -66,9 +66,7 @@
</Export-Package>
<Private-Package>
org.apache.sling.event.impl,
- org.apache.jackrabbit;
- org.apache.jackrabbit.name;
- org.apache.jackrabbit.util;split-package:=merge-first
+ org.apache.jackrabbit.util
</Private-Package>
<DynamicImport-Package>*</DynamicImport-Package>
<Sling-Nodetypes>
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java?rev=627463&r1=627462&r2=627463&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java Wed Feb 13 07:20:00 2008
@@ -35,7 +35,6 @@
import javax.jcr.Session;
import javax.jcr.observation.EventListener;
-import org.apache.jackrabbit.JcrConstants;
import org.apache.sling.event.EventUtil;
import org.apache.sling.jcr.api.SlingRepository;
import org.osgi.service.component.ComponentContext;
@@ -88,7 +87,7 @@
protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
/** A local queue for writing received events into the repository. */
- protected final BlockingQueue<EventInfo> writeQueue = new LinkedBlockingQueue<EventInfo>();
+ protected final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
/**
* Activate this component.
@@ -141,20 +140,20 @@
* @param context
*/
protected void deactivate(final ComponentContext context) {
- // stop background thread, by adding a job info to wake it up
+ // stop background threads by putting empty objects into the queue
this.running = false;
try {
- this.writeQueue.put(new EventInfo());
+ this.writeQueue.put(new Event("some", null));
} catch (InterruptedException e) {
- // we ignore this
this.ignoreException(e);
}
try {
this.queue.put(new EventInfo());
} catch (InterruptedException e) {
- // we ignore this
this.ignoreException(e);
}
+
+ // close session
this.stopWriterSession();
}
@@ -214,7 +213,7 @@
while ( st.hasMoreTokens() ) {
final String token = st.nextToken();
if ( !node.hasNode(token) ) {
- node.addNode(token, JcrConstants.NT_UNSTRUCTURED);
+ node.addNode(token, "nt:unstructured");
node.save();
}
node = node.getNode(token);
@@ -236,28 +235,26 @@
return EventHelper.EVENT_NODE_TYPE;
}
- protected String getNodeName(Event e) {
- final Calendar now = Calendar.getInstance();
- final String nodeType = this.getEventNodeType();
- final int sepPos = nodeType.indexOf(':');
- final String nodeName = nodeType.substring(sepPos+1) + "-" + this.applicationId + "-" + now.getTime().getTime();
-
- return nodeName;
- }
-
/**
* Write an event to the repository.
* @param e
* @throws RepositoryException
* @throws IOException
*/
- protected Node writeEvent(Event e)
+ protected Node writeEvent(Event e, String suggestedName)
throws RepositoryException {
// create new node with name of topic
final Node rootNode = (Node) this.writerSession.getItem(this.repositoryPath);
final String nodeType = this.getEventNodeType();
- final String nodeName = this.getNodeName(e);
+ final String nodeName;
+ if ( suggestedName != null ) {
+ nodeName = suggestedName;
+ } else {
+ final Calendar now = Calendar.getInstance();
+ final int sepPos = nodeType.indexOf(':');
+ nodeName = nodeType.substring(sepPos+1) + "-" + this.applicationId + "-" + now.getTime().getTime();
+ }
final Node eventNode = rootNode.addNode(nodeName, nodeType);
eventNode.setProperty(EventHelper.NODE_PROPERTY_CREATED, Calendar.getInstance());
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=627463&r1=627462&r2=627463&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Wed Feb 13 07:20:00 2008
@@ -140,26 +140,28 @@
protected void processWriteQueue() {
while ( this.running ) {
// so let's wait/get the next job from the queue
- EventInfo info = null;
+ Event event = null;
try {
- info = this.writeQueue.take();
+ event = this.writeQueue.take();
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
- if ( info != null && this.running ) {
+ if ( event != null && this.running ) {
try {
- final Node eventNode = this.writeEvent(info.event);
+ final Node eventNode = this.writeEvent(event, null);
+ final EventInfo info = new EventInfo();
+ info.event = event;
info.nodePath = eventNode.getPath();
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
} catch (Exception e) {
this.logger.error("Exception during writing the event to the repository.", e);
}
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // we ignore this
- this.ignoreException(e);
- }
}
}
}
@@ -206,9 +208,7 @@
*/
public void handleEvent(final Event event) {
try {
- final EventInfo info = new EventInfo();
- info.event = event;
- this.writeQueue.put(info);
+ this.writeQueue.put(event);
} catch (InterruptedException ex) {
// we ignore this
this.ignoreException(ex);
Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=627463&r1=627462&r2=627463&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Feb 13 07:20:00 2008
@@ -39,7 +39,6 @@
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;
-import org.apache.jackrabbit.JcrConstants;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
import org.osgi.framework.BundleEvent;
@@ -122,32 +121,33 @@
protected void processWriteQueue() {
while ( this.running ) {
// so let's wait/get the next job from the queue
- EventInfo info = null;
+ Event event = null;
try {
- info = this.writeQueue.take();
+ event = this.writeQueue.take();
} catch (InterruptedException e) {
// we ignore this
this.ignoreException(e);
}
- if ( info != null && this.running ) {
- final Event event = info.event;
+ if ( event != null && this.running ) {
final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+ final EventInfo info = new EventInfo();
+ info.event = event;
// if the job has no job id, we can just write the job to the repo and don't
// need locking
+ final String nodeName = this.getNodeName(event);
if ( jobId == null ) {
try {
- final Node eventNode = this.writeEvent(event);
+ final Node eventNode = this.writeEvent(event, nodeName);
info.nodePath = eventNode.getPath();
} catch (RepositoryException re ) {
// something went wrong, so let's log it
- this.logger.error("Exception during writing new job '" + this.getNodeName(event) + "' to repository.", re);
+ this.logger.error("Exception during writing new job '" + nodeName + "' to repository.", re);
}
} else {
try {
// let's first search for an existing node with the same id
final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
- final String nodeName = this.getNodeName(event);
Node foundNode = null;
if ( parentNode.hasNode(nodeName) ) {
foundNode = parentNode.getNode(nodeName);
@@ -170,7 +170,7 @@
if ( foundNode == null ) {
// We now write the event into the repository
try {
- final Node eventNode = this.writeEvent(event);
+ final Node eventNode = this.writeEvent(event, nodeName);
info.nodePath = eventNode.getPath();
} catch (ItemExistsException iee) {
// someone else did already write this node in the meantime
@@ -341,10 +341,8 @@
// job topic must be set, otherwise we ignore this event!
if ( jobTopic != null ) {
// queue the event in order to respond quickly
- final EventInfo info = new EventInfo();
- info.event = event;
try {
- this.writeQueue.put(info);
+ this.writeQueue.put(event);
} catch (InterruptedException e) {
// this should never happen
this.ignoreException(e);
@@ -420,7 +418,7 @@
}
/**
- * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getNodeName(org.osgi.service.event.Event)
+ * Create a unique node name for the job.
*/
protected String getNodeName(Event event) {
final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
@@ -750,7 +748,7 @@
}
if ( locked ) {
buffer.append(" and ");
- buffer.append(JcrConstants.JCR_LOCKOWNER);
+ buffer.append("jcr:lockOwner");
}
buffer.append("]");
final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
Copied: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java (from r627395, incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java)
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java?p2=incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java&p1=incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java&r1=627395&r2=627463&rev=627463&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java Wed Feb 13 07:20:00 2008
@@ -39,7 +39,6 @@
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;
-import org.apache.jackrabbit.util.Locked;
import org.apache.sling.event.EventUtil;
import org.apache.sling.scheduler.Job;
import org.apache.sling.scheduler.JobContext;
@@ -57,7 +56,7 @@
* values.started="org/osgi/framework/BundleEvent/STARTED"
* @scr.property name="repository.path" value="/sling/timed-events"
*/
-public class TimedEventHandler
+public class TimedJobHandler
extends AbstractRepositoryEventHandler
implements Job {
@@ -88,7 +87,46 @@
* @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
*/
protected void processWriteQueue() {
- // nothing to do right now
+ while ( this.running ) {
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( this.running && event != null ) {
+ ScheduleInfo scheduleInfo = null;
+ try {
+ scheduleInfo = new ScheduleInfo(event);
+ } catch (IllegalArgumentException iae) {
+ this.logger.error(iae.getMessage());
+ }
+ if ( scheduleInfo != null ) {
+ final EventInfo info = new EventInfo();
+ info.event = event;
+
+ // write event and update path
+ // if something went wrong we get the node path and reschedule
+ info.nodePath = this.persistEvent(info.event, scheduleInfo);
+ if ( info.nodePath != null ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen, so we ignore it
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a unique node name for this timed job.
+ */
+ protected String getNodeName(String jobTopic, String jobId) {
+ return jobTopic.replace('/', '.') + " " + jobId.replace('/', '.');
}
/**
@@ -113,39 +151,24 @@
}
if ( scheduleInfo != null ) {
try {
- // if the node path is null, this is a new event
- if ( info.nodePath == null ) {
- // write event and update path
- // if something went wrong we get the node path and reschedule
- info.nodePath = this.persistEvent(info.event, scheduleInfo);
- if ( info.nodePath != null ) {
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen, so we ignore it
- this.ignoreException(e);
- }
+ this.writerSession.refresh(true);
+ final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() ) {
+ // lock node
+ Lock lock = null;
+ try {
+ lock = eventNode.lock(false, true);
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
}
- } else {
- this.writerSession.refresh(true);
- final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
- if ( !eventNode.isLocked() ) {
- // lock node
- Lock lock = null;
- try {
- lock = eventNode.lock(false, true);
- } catch (RepositoryException re) {
- // lock failed which means that the node is locked by someone else, so we don't have to requeue
- }
- if ( lock != null ) {
- // if something went wrong, we reschedule
- if ( !this.processEvent(info.event, scheduleInfo) ) {
- try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // this should never happen, so we ignore it
- this.ignoreException(e);
- }
+ if ( lock != null ) {
+ // if something went wrong, we reschedule
+ if ( !this.processEvent(info.event, scheduleInfo) ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen, so we ignore it
+ this.ignoreException(e);
}
}
}
@@ -161,49 +184,47 @@
protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
try {
+ // get parent node
final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
- Lock lock = (Lock) new Locked() {
-
- protected Object run(Node node) throws RepositoryException {
- final String jobId = scheduleInfo.jobId;
- // if there is a node, we know that there is exactly one node
- final Node foundNode = queryJob(writerSession, jobId);
- if ( scheduleInfo.isStopEvent() ) {
- // if this is a stop event, we should remove the node from the repository
- // if there is no node someone else was faster and we can ignore this
- if ( foundNode != null ) {
- try {
- foundNode.remove();
- parentNode.save();
- } catch (LockException le) {
- // if someone else has the lock this is fine
- }
- }
- // stop the scheduler
- processEvent(event, scheduleInfo);
- return null;
- }
- // if there is already a node, it means we must handle an update
- if ( foundNode != null ) {
- try {
- foundNode.remove();
- parentNode.save();
- } catch (LockException le) {
- // if someone else has the lock this is fine
- }
- // create a stop event
- processEvent(event, scheduleInfo.getStopInfo());
- }
- // we only write the event if this is a local one
- if ( EventUtil.isLocal(event) ) {
-
- // write event to repository, lock it and schedule the event
- final Node eventNode = writeEvent(event);
- return eventNode.lock(false, true);
+ final String jobTopic = ((String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC));
+ final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+ final String nodeName = this.getNodeName(jobTopic, jobId);
+ // is there already a node?
+ final Node foundNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ Lock lock = null;
+ if ( scheduleInfo.isStopEvent() ) {
+ // if this is a stop event, we should remove the node from the repository
+ // if there is no node someone else was faster and we can ignore this
+ if ( foundNode != null ) {
+ try {
+ foundNode.remove();
+ parentNode.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
}
- return null;
}
- }.with(parentNode, false);
+ // stop the scheduler
+ processEvent(event, scheduleInfo);
+ } else {
+ // if there is already a node, it means we must handle an update
+ if ( foundNode != null ) {
+ try {
+ foundNode.remove();
+ parentNode.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
+ }
+ // create a stop event
+ processEvent(event, scheduleInfo.getStopInfo());
+ }
+ // we only write the event if this is a local one
+ if ( EventUtil.isLocal(event) ) {
+
+ // write event to repository, lock it and schedule the event
+ final Node eventNode = writeEvent(event, nodeName);
+ lock = eventNode.lock(false, true);
+ }
+ }
if ( lock != null ) {
// if something went wrong, we reschedule
@@ -216,9 +237,6 @@
} catch (RepositoryException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during writing new job to repository.", re);
- } catch (InterruptedException e) {
- // This should never happen from the lock, so we ignore it
- this.ignoreException(e);
}
return null;
}
@@ -345,10 +363,8 @@
public void handleEvent(Event event) {
if ( event.getTopic().equals(EventUtil.TOPIC_TIMED_EVENT) ) {
// queue the event in order to respond quickly
- final EventInfo info = new EventInfo();
- info.event = event;
try {
- this.queue.put(info);
+ this.writeQueue.put(event);
} catch (InterruptedException e) {
// this should never happen
this.ignoreException(e);
@@ -370,6 +386,7 @@
final Set<String> newUnloadedEvents = new HashSet<String>();
newUnloadedEvents.addAll(unloadedEvents);
try {
+ s = createSession();
for(String path : unloadedEvents ) {
newUnloadedEvents.remove(path);
try {
@@ -398,6 +415,9 @@
ignoreException(re);
}
}
+ } catch (RepositoryException re) {
+ // unable to create session, so we try it again next time
+ ignoreException(re);
} finally {
if ( s != null ) {
s.logout();
@@ -439,29 +459,20 @@
Session s = null;
try {
s = this.createSession();
- final Session mySession = s;
final Node parentNode = (Node)s.getItem(this.repositoryPath);
- new Locked() {
-
- protected Object run(Node node) throws RepositoryException {
- final Node eventNode = queryJob(mySession, info.jobId);
- if ( eventNode != null ) {
- try {
- eventNode.remove();
- parentNode.save();
- } catch (RepositoryException re) {
- // we ignore the exception if removing fails
- ignoreException(re);
- }
- }
- return null;
+ final String nodeName = this.getNodeName(topic, info.jobId);
+ final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ if ( eventNode != null ) {
+ try {
+ eventNode.remove();
+ parentNode.save();
+ } catch (RepositoryException re) {
+ // we ignore the exception if removing fails
+ ignoreException(re);
}
- }.with(parentNode, false);
+ }
} catch (RepositoryException re) {
this.logger.error("Unable to create a session.", re);
- } catch (InterruptedException e) {
- // This should never happen from the lock, so we ignore it
- this.ignoreException(e);
} finally {
if ( s != null ) {
s.logout();
@@ -536,33 +547,6 @@
if ( info.period != null ) {
eventNode.setProperty(EventHelper.NODE_PROPERTY_TE_PERIOD, info.period.longValue());
}
- }
-
- /**
- * Search for a node with the corresponding topic and unique key.
- * @param topic
- * @param key
- * @return The node or null.
- * @throws RepositoryException
- */
- protected Node queryJob(final Session session, final String jobId) throws RepositoryException {
- final QueryManager qManager = session.getWorkspace().getQueryManager();
- final StringBuffer buffer = new StringBuffer("/jcr:root");
- buffer.append(this.repositoryPath);
- buffer.append("//element(*, ");
- buffer.append(this.getEventNodeType());
- buffer.append(") [");
- buffer.append(EventHelper.NODE_PROPERTY_JOBID);
- buffer.append(" = '");
- buffer.append(jobId);
- buffer.append("']");
- final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
- final NodeIterator result = q.execute().getNodes();
- Node foundNode = null;
- if ( result.hasNext() ) {
- foundNode = result.nextNode();
- }
- return foundNode;
}
/**
Modified: incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java?rev=627463&r1=627462&r2=627463&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java (original)
+++ incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java Wed Feb 13 07:20:00 2008
@@ -18,7 +18,8 @@
*/
package org.apache.sling.event.impl;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.util.Calendar;
import java.util.Dictionary;
@@ -54,7 +55,7 @@
final Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put("a property", "some value");
final Event e = new Event(topic, props);
- this.handler.writeEvent(e);
+ this.handler.writeEvent(e, null);
final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
final NodeIterator iter = rootNode.getNodes();
@@ -73,7 +74,7 @@
props.put("a property", "some value");
// now we check if the application id is handled correctly
props.put(EventUtil.PROPERTY_APPLICATION, "foo");
- this.handler.writeEvent(new Event(topic, props));
+ this.handler.writeEvent(new Event(topic, props), null);
final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
final NodeIterator iter = rootNode.getNodes();
iter.hasNext();