You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/09/03 19:49:27 UTC

svn commit: r691696 - /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java

Author: cziegeler
Date: Wed Sep  3 10:49:27 2008
New Revision: 691696

URL: http://svn.apache.org/viewvc?rev=691696&view=rev
Log:
Better synchronization for timed events.

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java?rev=691696&r1=691695&r2=691696&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java Wed Sep  3 10:49:27 2008
@@ -89,7 +89,7 @@
         // load timed events from repository
         this.loadEvents();
         this.writerSession.getWorkspace().getObservationManager()
-            .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED, this.repositoryPath, true, null, null, true);
+            .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED|javax.jcr.observation.Event.PROPERTY_REMOVED, this.repositoryPath, true, null, null, true);
     }
 
     /**
@@ -117,7 +117,9 @@
 
                     // write event and update path
                     // if something went wrong we get the node path and reschedule
-                    info.nodePath = this.persistEvent(info.event, scheduleInfo);
+                    synchronized ( this.writerSession ) {
+                        info.nodePath = this.persistEvent(info.event, scheduleInfo);
+                    }
                     if ( info.nodePath != null ) {
                         try {
                             this.queue.put(info);
@@ -162,23 +164,25 @@
                     if ( scheduleInfo != null ) {
                         try {
                             this.writerSession.refresh(true);
-                            final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
-                            if ( !eventNode.isLocked() ) {
-                                // lock node
-                                Lock lock = null;
-                                try {
-                                    lock = eventNode.lock(false, true);
-                                } catch (RepositoryException re) {
-                                    // lock failed which means that the node is locked by someone else, so we don't have to requeue
-                                }
-                                if ( lock != null ) {
-                                    // if something went wrong, we reschedule
-                                    if ( !this.processEvent(info.event, scheduleInfo) ) {
-                                        try {
-                                            this.queue.put(info);
-                                        } catch (InterruptedException e) {
-                                            // this should never happen, so we ignore it
-                                            this.ignoreException(e);
+                            if ( this.writerSession.itemExists(info.nodePath) ) {
+                                final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
+                                if ( !eventNode.isLocked() ) {
+                                    // lock node
+                                    Lock lock = null;
+                                    try {
+                                        lock = eventNode.lock(false, true);
+                                    } catch (RepositoryException re) {
+                                        // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                                    }
+                                    if ( lock != null ) {
+                                        // if something went wrong, we reschedule
+                                        if ( !this.processEvent(info.event, scheduleInfo) ) {
+                                            try {
+                                                this.queue.put(info);
+                                            } catch (InterruptedException e) {
+                                                // this should never happen, so we ignore it
+                                                this.ignoreException(e);
+                                            }
                                         }
                                     }
                                 }
@@ -196,7 +200,7 @@
     protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
         try {
             // get parent node
-            final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
+            final Node parentNode = this.ensureRepositoryPath();
             final String nodeName = this.getNodeName(scheduleInfo.jobId);
             // is there already a node?
             final Node foundNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
@@ -329,31 +333,39 @@
             s = this.createSession();
             while ( iter.hasNext() ) {
                 final javax.jcr.observation.Event event = iter.nextEvent();
-                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED ) {
-                    try {
-                        final Node eventNode = (Node) s.getItem(event.getPath());
-                        if ( !eventNode.isLocked() ) {
-                            final String nodePath = event.getPath();
-                            try {
-                                final EventInfo info = new EventInfo();
-                                info.event = this.readEvent(eventNode);
-                                info.nodePath =nodePath;
+                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+                    || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+
+                    final String propPath = event.getPath();
+                    int pos = propPath.lastIndexOf('/');
+                    final String nodePath = propPath.substring(0, pos);
+                    final String propertyName = propPath.substring(pos+1);
+                    // we are only interested in unlocks
+                    if ( "jcr:lockOwner".equals(propertyName) ) {
+                        try {
+                            final Node eventNode = (Node) s.getItem(nodePath);
+                            if ( !eventNode.isLocked() ) {
                                 try {
-                                    this.queue.put(info);
-                                } catch (InterruptedException e) {
-                                    // we ignore this exception as this should never occur
-                                    this.ignoreException(e);
-                                }
-                            } catch (ClassNotFoundException cnfe) {
-                                // add it to the unloaded set
-                                synchronized (unloadedEvents) {
-                                    this.unloadedEvents.add(nodePath);
+                                    final EventInfo info = new EventInfo();
+                                    info.event = this.readEvent(eventNode);
+                                    info.nodePath =nodePath;
+                                    try {
+                                        this.queue.put(info);
+                                    } catch (InterruptedException e) {
+                                        // we ignore this exception as this should never occur
+                                        this.ignoreException(e);
+                                    }
+                                } catch (ClassNotFoundException cnfe) {
+                                    // add it to the unloaded set
+                                    synchronized (unloadedEvents) {
+                                        this.unloadedEvents.add(nodePath);
+                                    }
+                                    this.ignoreException(cnfe);
                                 }
-                                this.ignoreException(cnfe);
                             }
+                        } catch (RepositoryException re) {
+                            this.logger.error("Exception during jcr event processing.", re);
                         }
-                    } catch (RepositoryException re) {
-                        this.logger.error("Exception during jcr event processing.", re);
                     }
                 }
             }