You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/04/21 14:44:54 UTC

svn commit: r936287 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/ main/java/org/apache/sling/event/impl/ main/java/org/apache/sling/event/impl/job/ test/java/org/apache/sling/event/impl/

Author: cziegeler
Date: Wed Apr 21 12:44:53 2010
New Revision: 936287

URL: http://svn.apache.org/viewvc?rev=936287&view=rev
Log:
SLING-1494 : Update to JCR 2 API

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java?rev=936287&r1=936286&r2=936287&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java Wed Apr 21 12:44:53 2010
@@ -38,6 +38,8 @@ public class EventPropertiesMap
     extends Dictionary<String, Object>
     implements Map<String, Object>, Serializable {
 
+    private static final long serialVersionUID = 835179638502569708L;
+
     private final Map<String, Object> delegatee;
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=936287&r1=936286&r2=936287&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Wed Apr 21 12:44:53 2010
@@ -27,13 +27,13 @@ import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.query.Query;
+import javax.jcr.query.qom.QueryObjectModelFactory;
 
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
 import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.jackrabbit.util.ISO8601;
 import org.apache.sling.commons.osgi.OsgiUtil;
 import org.apache.sling.event.EventUtil;
 import org.osgi.service.component.ComponentContext;
@@ -83,24 +83,26 @@ public class DistributingEventHandler
     }
 
     /**
-     * Return the query string for the clean up.
+     * Return the query for the clean up.
      */
-    protected String getCleanUpQueryString() {
+    protected Query getCleanUpQuery(final Session s)
+    throws RepositoryException {
+        final String selectorName = "nodetype";
         final Calendar deleteBefore = Calendar.getInstance();
         deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
-        final String dateString = ISO8601.format(deleteBefore);
 
-        final StringBuilder buffer = new StringBuilder("/jcr:root");
-        buffer.append(this.repositoryPath);
-        buffer.append("//element(*, ");
-        buffer.append(getEventNodeType());
-        buffer.append(")[@");
-        buffer.append(EventHelper.NODE_PROPERTY_CREATED);
-        buffer.append(" < xs:dateTime('");
-        buffer.append(dateString);
-        buffer.append("')]");
+        final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory();
 
-        return buffer.toString();
+        final Query q = qomf.createQuery(
+                qomf.selector(getEventNodeType(), selectorName),
+                qomf.and(qomf.descendantNode(selectorName, this.repositoryPath),
+                         qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED),
+                                       QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+                                       qomf.literal(s.getValueFactory().createValue(deleteBefore)))),
+                null,
+                null
+        );
+        return q;
     }
 
     /**
@@ -111,14 +113,14 @@ public class DistributingEventHandler
         if ( this.cleanupPeriod > 0 ) {
             this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
 
-            final String queryString = this.getCleanUpQueryString();
             // we create an own session for concurrency issues
             Session s = null;
             try {
                 s = this.createSession();
-                final Node parentNode = (Node)s.getItem(this.repositoryPath);
-                logger.debug("Executing query {}", queryString);
-                final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+                final Query q = this.getCleanUpQuery(s);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Executing query {}", q.getStatement());
+                }
                 final NodeIterator iter = q.execute().getNodes();
                 int count = 0;
                 while ( iter.hasNext() ) {
@@ -126,7 +128,7 @@ public class DistributingEventHandler
                     eventNode.remove();
                     count++;
                 }
-                parentNode.save();
+                s.save();
                 logger.debug("Removed {} entries from the repository.", count);
 
             } catch (RepositoryException e) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java?rev=936287&r1=936286&r2=936287&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java Wed Apr 21 12:44:53 2010
@@ -233,7 +233,8 @@ public abstract class EventHelper {
                             }
                         }
                         oos.close();
-                        node.setProperty(EventHelper.NODE_PROPERTY_PROPERTIES, new ByteArrayInputStream(baos.toByteArray()));
+                        node.setProperty(EventHelper.NODE_PROPERTY_PROPERTIES,
+                                node.getSession().getValueFactory().createBinary(new ByteArrayInputStream(baos.toByteArray())));
                     } catch (IOException ioe) {
                         throw new RepositoryException("Unable to serialize event " + EventUtil.toString(event), ioe);
                     }
@@ -257,7 +258,7 @@ public abstract class EventHelper {
         // check the properties blob
         if ( node.hasProperty(EventHelper.NODE_PROPERTY_PROPERTIES) ) {
             try {
-                final ObjectInputStream ois = new ObjectInputStream(node.getProperty(EventHelper.NODE_PROPERTY_PROPERTIES).getStream(),
+                final ObjectInputStream ois = new ObjectInputStream(node.getProperty(EventHelper.NODE_PROPERTY_PROPERTIES).getBinary().getStream(),
                         objectClassLoader);
                 int length = ois.readInt();
                 for(int i=0;i<length;i++) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=936287&r1=936286&r2=936287&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Apr 21 12:44:53 2010
@@ -38,16 +38,20 @@ import javax.jcr.NodeIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.Value;
+import javax.jcr.ValueFactory;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.query.Query;
 import javax.jcr.query.QueryManager;
+import javax.jcr.query.qom.Comparison;
+import javax.jcr.query.qom.Constraint;
+import javax.jcr.query.qom.Ordering;
+import javax.jcr.query.qom.QueryObjectModelFactory;
 
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.felix.scr.annotations.Services;
-import org.apache.jackrabbit.util.ISO8601;
 import org.apache.sling.commons.osgi.OsgiUtil;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPool;
@@ -304,24 +308,26 @@ public class JobEventHandler
     }
 
     /**
-     * Return the query string for the clean up.
+     * Return the query for the clean up.
      */
-    private String getCleanUpQueryString() {
+    private Query getCleanUpQuery(final Session s)
+    throws RepositoryException {
+        final String selectorName = "nodetype";
         final Calendar deleteBefore = Calendar.getInstance();
         deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
-        final String dateString = ISO8601.format(deleteBefore);
 
-        final StringBuilder buffer = new StringBuilder("/jcr:root");
-        buffer.append(this.repositoryPath);
-        buffer.append("//element(*, ");
-        buffer.append(getEventNodeType());
-        buffer.append(")[@");
-        buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
-        buffer.append(" < xs:dateTime('");
-        buffer.append(dateString);
-        buffer.append("')]");
+        final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory();
 
-        return buffer.toString();
+        final Query q = qomf.createQuery(
+                qomf.selector(getEventNodeType(), selectorName),
+                qomf.and(qomf.descendantNode(selectorName, this.repositoryPath),
+                         qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_FINISHED),
+                                       QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+                                       qomf.literal(s.getValueFactory().createValue(deleteBefore)))),
+                null,
+                null
+        );
+        return q;
     }
 
     private void loadJobsInTheBackground() {
@@ -375,13 +381,14 @@ public class JobEventHandler
             if ( this.cleanupPeriod > 0 ) {
                 this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);
 
-                final String queryString = this.getCleanUpQueryString();
                 // we create an own session for concurrency issues
                 Session s = null;
                 try {
                     s = this.createSession();
-                    logger.debug("Executing query {}", queryString);
-                    final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+                    final Query q = this.getCleanUpQuery(s);
+                    if ( logger.isDebugEnabled() ) {
+                        logger.debug("Executing query {}", q.getStatement());
+                    }
                     final NodeIterator iter = q.execute().getNodes();
                     int count = 0;
                     while ( iter.hasNext() ) {
@@ -833,7 +840,7 @@ public class JobEventHandler
                             if ( !eventNode.isLocked() ) {
                                 // lock node
                                 try {
-                                    eventNode.lock(false, true);
+                                    this.backgroundSession.getWorkspace().getLockManager().lock(info.nodePath, false, true, Long.MAX_VALUE, "JobEventHandler");
                                 } catch (RepositoryException re) {
                                     // lock failed which means that the node is locked by someone else, so we don't have to requeue
                                     return Status.FAILED;
@@ -985,7 +992,7 @@ public class JobEventHandler
             final String nodePath = eventNode.getPath();
             final Event jobEvent = this.getJobEvent(event, nodePath);
             eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
-            eventNode.save();
+            eventNode.getSession().save();
             final EventAdmin localEA = this.eventAdmin;
             if ( localEA != null ) {
                 final StartedJobInfo jobInfo = new StartedJobInfo(jobEvent, nodePath, System.currentTimeMillis());
@@ -1015,7 +1022,7 @@ public class JobEventHandler
 
                 // unlock node
                 try {
-                    eventNode.unlock();
+                    eventNode.getSession().getWorkspace().getLockManager().unlock(eventNode.getPath());
                 } catch (RepositoryException e) {
                     // if unlock fails, we silently ignore this
                     this.ignoreException(e);
@@ -1130,35 +1137,34 @@ public class JobEventHandler
             logger.debug("Loading from repository since {} and max {}", since, maxLoad);
             try {
                 final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager();
-                final StringBuilder buffer = new StringBuilder("/jcr:root");
-                buffer.append(this.repositoryPath);
-                buffer.append("//element(*, ");
-                buffer.append(this.getEventNodeType());
-                buffer.append(") [not(@");
-                buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
-                buffer.append(")");
+                final ValueFactory vf = this.backgroundSession.getValueFactory();
+                final String selectorName = "nodetype";
+                final Calendar startDate = Calendar.getInstance();
+                startDate.setTimeInMillis(this.startTime);
+
+                final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+                Constraint constraint = qomf.and(
+                        qomf.descendantNode(selectorName, this.repositoryPath),
+                        qomf.not(qomf.propertyExistence(selectorName, EventHelper.NODE_PROPERTY_FINISHED)));
+                constraint = qomf.and(constraint,
+                        qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED),
+                                QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+                                qomf.literal(vf.createValue(startDate))));
                 if ( since != -1 ) {
                     final Calendar beforeDate = Calendar.getInstance();
                     beforeDate.setTimeInMillis(since);
-                    final String dateString = ISO8601.format(beforeDate);
-                    buffer.append(" and @");
-                    buffer.append(EventHelper.NODE_PROPERTY_CREATED);
-                    buffer.append(" > xs:dateTime('");
-                    buffer.append(dateString);
-                    buffer.append("')");
-                }
-                final Calendar startDate = Calendar.getInstance();
-                startDate.setTimeInMillis(this.startTime);
-                final String dateString = ISO8601.format(startDate);
-                buffer.append(" and @");
-                buffer.append(EventHelper.NODE_PROPERTY_CREATED);
-                buffer.append(" < xs:dateTime('");
-                buffer.append(dateString);
-                buffer.append("')");
-                buffer.append("] order by @");
-                buffer.append(EventHelper.NODE_PROPERTY_CREATED);
-                buffer.append(" ascending");
-                final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
+                    constraint = qomf.and(constraint,
+                            qomf.comparison(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED),
+                                    QueryObjectModelFactory.JCR_OPERATOR_GREATER_THAN,
+                                    qomf.literal(vf.createValue(beforeDate))));
+                }
+                final Query q = qomf.createQuery(
+                        qomf.selector(getEventNodeType(), selectorName),
+                        constraint,
+                        new Ordering[] {qomf.ascending(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED))},
+                        null
+                );
                 final NodeIterator result = q.execute().getNodes();
                 long count = 0;
                 while ( result.hasNext() && count < maxLoad ) {
@@ -1348,7 +1354,7 @@ public class JobEventHandler
                 }
                 // unlock node
                 try {
-                    eventNode.unlock();
+                    eventNode.getSession().getWorkspace().getLockManager().unlock(eventNode.getPath());
                 } catch (RepositoryException e) {
                     // if unlock fails, we silently ignore this
                     this.ignoreException(e);
@@ -1492,34 +1498,33 @@ public class JobEventHandler
         try {
             s = this.createSession();
             final QueryManager qManager = s.getWorkspace().getQueryManager();
-            final StringBuilder buffer = new StringBuilder("/jcr:root");
-            buffer.append(this.repositoryPath);
-            if ( topic != null ) {
-                buffer.append('/');
-                buffer.append(topic.replace('/', '.'));
-            }
-            buffer.append("//element(*, ");
-            buffer.append(this.getEventNodeType());
-            buffer.append(") [not(@");
-            buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
-            buffer.append(")");
+            final String selectorName = "nodetype";
+
+            final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+            final String path;
+            if ( topic == null ) {
+                path = this.repositoryPath;
+            } else {
+                path = this.repositoryPath + '/' + topic.replace('/', '.');
+            }
+            Constraint constraint = qomf.and(qomf.descendantNode(selectorName, path),
+                    qomf.not(qomf.propertyExistence(selectorName, EventHelper.NODE_PROPERTY_FINISHED)));
+
             if ( locked != null ) {
                 if ( locked ) {
-                    buffer.append(" and @jcr:lockOwner");
+                    constraint = qomf.and(constraint,
+                            qomf.propertyExistence(selectorName, "jcr:lockOwner"));
                 } else {
-                    buffer.append(" and not(@jcr:lockOwner)");
+                    constraint = qomf.and(constraint,
+                            qomf.not(qomf.propertyExistence(selectorName, "jcr:lockOwner")));
                 }
             }
             if ( filterProps != null && filterProps.length > 0 ) {
-                buffer.append(" and (");
-                int index = 0;
+                Constraint orConstraint = null;
                 for (Map<String,Object> template : filterProps) {
-                    if ( index > 0 ) {
-                        buffer.append(" or ");
-                    }
-                    buffer.append('(');
+                    Constraint comp = null;
                     final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
-                    boolean first = true;
                     while ( i.hasNext() ) {
                         final Map.Entry<String, Object> current = i.next();
                         // check prop name first
@@ -1528,32 +1533,39 @@ public class JobEventHandler
                             // check value
                             final Value value = EventHelper.getNodePropertyValue(s.getValueFactory(), current.getValue());
                             if ( value != null ) {
-                                if ( first ) {
-                                    first = false;
-                                    buffer.append('@');
+                                final Comparison newComp = qomf.comparison(qomf.propertyValue(selectorName, propName),
+                                        QueryObjectModelFactory.JCR_OPERATOR_EQUAL_TO,
+                                        qomf.literal(value));
+                                if ( comp == null ) {
+                                    comp = newComp;
                                 } else {
-                                    buffer.append(" and @");
+                                    comp = qomf.and(comp, newComp);
                                 }
-                                buffer.append(propName);
-                                buffer.append(" = '");
-                                buffer.append(current.getValue());
-                                buffer.append("'");
                             }
                         }
                     }
-                    buffer.append(')');
-                    index++;
+                    if ( comp != null ) {
+                        if ( orConstraint == null ) {
+                            orConstraint = comp;
+                        } else {
+                            orConstraint = qomf.or(constraint, comp);
+                        }
+                    }
+                }
+                if ( orConstraint != null ) {
+                    constraint = qomf.and(constraint, orConstraint);
                 }
-                buffer.append(')');
             }
-            buffer.append("]");
-            buffer.append(" order by @");
-            buffer.append(EventHelper.NODE_PROPERTY_CREATED);
-            buffer.append(" ascending");
-            final String queryString = buffer.toString();
-            logger.debug("Executing job query {}.", queryString);
+            final Query q = qomf.createQuery(
+                    qomf.selector(getEventNodeType(), selectorName),
+                    constraint,
+                    new Ordering[] {qomf.ascending(qomf.propertyValue(selectorName, EventHelper.NODE_PROPERTY_CREATED))},
+                    null
+            );
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Executing job query {}.", q.getStatement());
+            }
 
-            final Query q = qManager.createQuery(queryString, Query.XPATH);
             final NodeIterator iter = q.execute().getNodes();
             while ( iter.hasNext() ) {
                 final Node eventNode = iter.nextNode();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java?rev=936287&r1=936286&r2=936287&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java Wed Apr 21 12:44:53 2010
@@ -44,6 +44,9 @@ import javax.jcr.lock.LockException;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.query.Query;
 import javax.jcr.query.QueryManager;
+import javax.jcr.query.qom.Comparison;
+import javax.jcr.query.qom.Constraint;
+import javax.jcr.query.qom.QueryObjectModelFactory;
 
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Properties;
@@ -173,7 +176,7 @@ public class TimedJobHandler
                                     // lock node
                                     Lock lock = null;
                                     try {
-                                        lock = eventNode.lock(false, true);
+                                        lock = eventNode.getSession().getWorkspace().getLockManager().lock(info.nodePath, false, true, Long.MAX_VALUE, "TimedJobHandler");
                                     } catch (RepositoryException re) {
                                         // lock failed which means that the node is locked by someone else, so we don't have to requeue
                                     }
@@ -238,7 +241,7 @@ public class TimedJobHandler
 
                     // write event to repository, lock it and schedule the event
                     final Node eventNode = writeEvent(event, nodeName);
-                    lock = eventNode.lock(false, true);
+                    lock = eventNode.getSession().getWorkspace().getLockManager().lock(eventNode.getPath(), false, true, Long.MAX_VALUE, "TimedJobHandler");
                 }
             }
 
@@ -246,7 +249,7 @@ public class TimedJobHandler
                 // if something went wrong, we reschedule
                 if ( !this.processEvent(event, scheduleInfo) ) {
                     final String path = lock.getNode().getPath();
-                    lock.getNode().unlock();
+                    writerSession.getWorkspace().getLockManager().unlock(path);
                     return path;
                 }
             }
@@ -516,12 +519,16 @@ public class TimedJobHandler
     protected void loadEvents() {
         try {
             final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
-            final StringBuilder buffer = new StringBuilder("/jcr:root");
-            buffer.append(this.repositoryPath);
-            buffer.append("//element(*, ");
-            buffer.append(this.getEventNodeType());
-            buffer.append(")");
-            final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
+            final String selectorName = "nodetype";
+
+            final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+            final Query q = qomf.createQuery(
+                    qomf.selector(getEventNodeType(), selectorName),
+                    qomf.descendantNode(selectorName, this.repositoryPath),
+                    null,
+                    null
+            );
             final NodeIterator result = q.execute().getNodes();
             while ( result.hasNext() ) {
                 final Node eventNode = result.nextNode();
@@ -585,6 +592,8 @@ public class TimedJobHandler
 
     protected static final class ScheduleInfo implements Serializable {
 
+        private static final long serialVersionUID = 8667701700547811142L;
+
         public final String expression;
         public final Long   period;
         public final Date   date;
@@ -681,25 +690,22 @@ public class TimedJobHandler
         try {
             s = this.createSession();
             final QueryManager qManager = s.getWorkspace().getQueryManager();
-            final StringBuilder buffer = new StringBuilder("/jcr:root");
-            buffer.append(this.repositoryPath);
-            if ( topic != null ) {
-                buffer.append('/');
-                buffer.append(topic.replace('/', '.'));
-            }
-            buffer.append("//element(*, ");
-            buffer.append(this.getEventNodeType());
-            buffer.append(")");
+            final String selectorName = "nodetype";
+
+            final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+            final String path;
+            if ( topic == null ) {
+                path = this.repositoryPath;
+            } else {
+                path = this.repositoryPath + '/' + topic.replace('/', '.');
+            }
+            Constraint constraint = qomf.descendantNode(selectorName, path);
             if ( filterProps != null && filterProps.length > 0 ) {
-                buffer.append(" [");
-                int index = 0;
+                Constraint orConstraint = null;
                 for (Map<String,Object> template : filterProps) {
-                    if ( index > 0 ) {
-                        buffer.append(" or ");
-                    }
-                    buffer.append('(');
+                    Constraint comp = null;
                     final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
-                    boolean first = true;
                     while ( i.hasNext() ) {
                         final Map.Entry<String, Object> current = i.next();
                         // check prop name first
@@ -708,28 +714,39 @@ public class TimedJobHandler
                             // check value
                             final Value value = EventHelper.getNodePropertyValue(s.getValueFactory(), current.getValue());
                             if ( value != null ) {
-                                if ( first ) {
-                                    first = false;
-                                    buffer.append('@');
+                                final Comparison newComp = qomf.comparison(qomf.propertyValue(selectorName, propName),
+                                        QueryObjectModelFactory.JCR_OPERATOR_EQUAL_TO,
+                                        qomf.literal(value));
+                                if ( comp == null ) {
+                                    comp = newComp;
                                 } else {
-                                    buffer.append(" and @");
+                                    comp = qomf.and(comp, newComp);
                                 }
-                                buffer.append(propName);
-                                buffer.append(" = '");
-                                buffer.append(current.getValue());
-                                buffer.append("'");
                             }
                         }
                     }
-                    buffer.append(')');
-                    index++;
+                    if ( comp != null ) {
+                        if ( orConstraint == null ) {
+                            orConstraint = comp;
+                        } else {
+                            orConstraint = qomf.or(orConstraint, comp);
+                        }
+                    }
+                }
+                if ( orConstraint != null ) {
+                    constraint = qomf.and(constraint, orConstraint);
                 }
-                buffer.append(']');
             }
-            final String queryString = buffer.toString();
-            logger.debug("Executing job query {}.", queryString);
+            final Query q = qomf.createQuery(
+                    qomf.selector(getEventNodeType(), selectorName),
+                    constraint,
+                    null,
+                    null
+            );
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Executing job query {}.", q.getStatement());
+            }
 
-            final Query q = qManager.createQuery(queryString, Query.XPATH);
             final NodeIterator iter = q.execute().getNodes();
             while ( iter.hasNext() ) {
                 final Node eventNode = iter.nextNode();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=936287&r1=936286&r2=936287&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Wed Apr 21 12:44:53 2010
@@ -33,6 +33,8 @@ import org.slf4j.Logger;
  */
 public final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> {
 
+    private static final long serialVersionUID = -1874643704782461425L;
+
     private volatile EventInfo eventInfo;
 
     private final Object lock = new Object();

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=936287&r1=936286&r2=936287&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Wed Apr 21 12:44:53 2010
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -56,6 +57,13 @@ public class JobEventHandlerTest extends
         return this.context;
     }
 
+    @Override
+    protected Dictionary<String, Object> getComponentConfig() {
+        final Dictionary<String, Object> config =  super.getComponentConfig();
+        config.put("cleanup.period", 1); // set clean up to 1 minute
+        return config;
+    }
+
     /**
      * Simple setup test which checks if the session and the session listener
      * is registered.
@@ -360,4 +368,42 @@ public class JobEventHandlerTest extends
         assertEquals("Started count", 10, started.size());
         assertEquals("Failed count", 5, failed.size());
     }
+
+    @org.junit.Test public void testCleanup() throws Exception {
+        final Calendar obsolete = Calendar.getInstance();
+        obsolete.add(Calendar.MINUTE, -10);
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "1").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete);
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "2").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete);
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "3").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete);
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "4").setProperty(EventHelper.NODE_PROPERTY_FINISHED, obsolete);
+
+        final Calendar future = Calendar.getInstance();
+        future.add(Calendar.MINUTE, +10);
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "5").setProperty(EventHelper.NODE_PROPERTY_FINISHED, future);
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "6").setProperty(EventHelper.NODE_PROPERTY_FINISHED, future);
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "7");
+        handler.writeEvent(new Event("test", (Dictionary<String, Object>)null), "8");
+
+        handler.writerSession.save();
+        assertTrue(handler.getWriterRootNode().hasNode("1"));
+        assertEquals(obsolete, handler.getWriterRootNode().getNode("1").getProperty(EventHelper.NODE_PROPERTY_FINISHED).getDate());
+        assertTrue(handler.getWriterRootNode().hasNode("2"));
+        assertTrue(handler.getWriterRootNode().hasNode("3"));
+        assertTrue(handler.getWriterRootNode().hasNode("4"));
+        assertTrue(handler.getWriterRootNode().hasNode("5"));
+        assertTrue(handler.getWriterRootNode().hasNode("6"));
+        assertTrue(handler.getWriterRootNode().hasNode("7"));
+        assertTrue(handler.getWriterRootNode().hasNode("8"));
+
+        ((JobEventHandler)handler).run();
+
+        assertFalse(handler.getWriterRootNode().hasNode("1"));
+        assertFalse(handler.getWriterRootNode().hasNode("2"));
+        assertFalse(handler.getWriterRootNode().hasNode("3"));
+        assertFalse(handler.getWriterRootNode().hasNode("4"));
+        assertTrue(handler.getWriterRootNode().hasNode("5"));
+        assertTrue(handler.getWriterRootNode().hasNode("6"));
+        assertTrue(handler.getWriterRootNode().hasNode("7"));
+        assertTrue(handler.getWriterRootNode().hasNode("8"));
+    }
 }