You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/03/10 16:00:57 UTC

svn commit: r635576 - in /incubator/sling/trunk/sling/event/src/main: java/org/apache/sling/event/impl/ resources/SLING-INF/nodetypes/

Author: cziegeler
Date: Mon Mar 10 08:00:54 2008
New Revision: 635576

URL: http://svn.apache.org/viewvc?rev=635576&view=rev
Log:
#0000 - Proper handling of jobs if used in a clustered environment.

Modified:
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    incubator/sling/trunk/sling/event/src/main/resources/SLING-INF/nodetypes/event.cnd

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=635576&r1=635575&r2=635576&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Mon Mar 10 08:00:54 2008
@@ -75,7 +75,7 @@
     }
 
     /**
-     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getCleanUpQueryString()
+     * Return the query string for the clean up.
      */
     protected String getCleanUpQueryString() {
         final Calendar deleteBefore = Calendar.getInstance();
@@ -104,31 +104,29 @@
             this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
 
             final String queryString = this.getCleanUpQueryString();
-            if ( queryString != null ) {
-                // we create an own session for concurrency issues
-                Session s = null;
-                try {
-                    s = this.createSession();
-                    final Node parentNode = (Node)s.getItem(this.repositoryPath);
-                    logger.debug("Executing query {}", queryString);
-                    final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
-                    final NodeIterator iter = q.execute().getNodes();
-                    int count = 0;
-                    while ( iter.hasNext() ) {
-                        final Node eventNode = iter.nextNode();
-                        eventNode.remove();
-                        count++;
-                    }
-                    parentNode.save();
-                    logger.debug("Removed {} entries from the repository.", count);
+            // we create an own session for concurrency issues
+            Session s = null;
+            try {
+                s = this.createSession();
+                final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                logger.debug("Executing query {}", queryString);
+                final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+                final NodeIterator iter = q.execute().getNodes();
+                int count = 0;
+                while ( iter.hasNext() ) {
+                    final Node eventNode = iter.nextNode();
+                    eventNode.remove();
+                    count++;
+                }
+                parentNode.save();
+                logger.debug("Removed {} entries from the repository.", count);
 
-                } catch (RepositoryException e) {
-                    // in the case of an error, we just log this as a warning
-                    this.logger.warn("Exception during repository cleanup.", e);
-                } finally {
-                    if ( s != null ) {
-                        s.logout();
-                    }
+            } catch (RepositoryException e) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during repository cleanup.", e);
+            } finally {
+                if ( s != null ) {
+                    s.logout();
                 }
             }
         }

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java?rev=635576&r1=635575&r2=635576&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java Mon Mar 10 08:00:54 2008
@@ -32,7 +32,7 @@
     public static final String NODE_PROPERTY_PROPERTIES = "slingevent:properties";
     public static final String NODE_PROPERTY_PROCESSOR = "slingevent:processor";
     public static final String NODE_PROPERTY_JOBID = "slingevent:id";
-    //public static final String NODE_PROPERTY_FINISHED = "slingevent:finished";
+    public static final String NODE_PROPERTY_FINISHED = "slingevent:finished";
     public static final String NODE_PROPERTY_TE_EXPRESSION = "slingevent:expression";
     public static final String NODE_PROPERTY_TE_DATE = "slingevent:date";
     public static final String NODE_PROPERTY_TE_PERIOD = "slingevent:period";

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=635576&r1=635575&r2=635576&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Mon Mar 10 08:00:54 2008
@@ -19,6 +19,7 @@
 package org.apache.sling.event.impl;
 
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.Dictionary;
 import java.util.HashMap;
@@ -38,6 +39,7 @@
 import javax.jcr.query.Query;
 import javax.jcr.query.QueryManager;
 
+import org.apache.jackrabbit.util.ISO8601;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
 import org.osgi.framework.BundleEvent;
@@ -55,10 +57,15 @@
  *               values.updated="org/osgi/framework/BundleEvent/UPDATED"
  *               values.started="org/osgi/framework/BundleEvent/STARTED"
  * @scr.property name="repository.path" value="/sling/jobs"
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ * @scr.service interface="java.lang.Runnable"
+ * @scr.property name="scheduler.period" value="600" type="Long"
+ * @scr.property name="scheduler.concurrent" value="false" type="Boolean"
  */
 public class JobEventHandler
     extends AbstractRepositoryEventHandler
-    implements EventUtil.JobStatusNotifier, JobStatusProvider {
+    implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable {
 
     /** The topic prefix for bundle events. */
     protected static final String BUNDLE_EVENT_PREFIX = BundleEvent.class.getName().replace('.', '/') + '/';
@@ -84,6 +91,15 @@
     /** List of deleted jobs. */
     protected Set<String>deletedJobs = new HashSet<String>();
 
+    /** Default clean up time is 10 minutes. */
+    protected static final int DEFAULT_CLEANUP_PERIOD = 10;
+
+    /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
+    protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+    /** We remove everything which is older than 10 min by default. */
+    protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
     /**
      * Activate this component.
      * @param context
@@ -91,6 +107,12 @@
      */
     protected void activate(final ComponentContext context)
     throws Exception {
+        final Integer i = (Integer)context.getProperties().get(CONFIG_PROPERTY_CLEANUP_PERIOD);
+        if ( i != null ) {
+            this.cleanupPeriod = i;
+        } else {
+            this.cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+        }
         if ( context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) != null ) {
             this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) * 1000;
         } else {
@@ -117,6 +139,63 @@
     }
 
     /**
+     * Return the query string for the clean up.
+     */
+    protected String getCleanUpQueryString() {
+        final Calendar deleteBefore = Calendar.getInstance();
+        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+        final String dateString = ISO8601.format(deleteBefore);
+
+        final StringBuffer buffer = new StringBuffer("/jcr:root");
+        buffer.append(this.repositoryPath);
+        buffer.append("//element(*, ");
+        buffer.append(getEventNodeType());
+        buffer.append(")[@");
+        buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
+        buffer.append(" < xs:dateTime('");
+        buffer.append(dateString);
+        buffer.append("')]");
+
+        return buffer.toString();
+    }
+
+    /**
+     * This method is invoked periodically.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+        if ( this.cleanupPeriod > 0 ) {
+            this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);
+
+            final String queryString = this.getCleanUpQueryString();
+            // we create an own session for concurrency issues
+            Session s = null;
+            try {
+                s = this.createSession();
+                final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                logger.debug("Executing query {}", queryString);
+                final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+                final NodeIterator iter = q.execute().getNodes();
+                int count = 0;
+                while ( iter.hasNext() ) {
+                    final Node eventNode = iter.nextNode();
+                    eventNode.remove();
+                    count++;
+                }
+                parentNode.save();
+                logger.debug("Removed {} entries from the repository.", count);
+
+            } catch (RepositoryException e) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during repository cleanup.", e);
+            } finally {
+                if ( s != null ) {
+                    s.logout();
+                }
+            }
+        }
+    }
+    /**
      * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
      */
     protected void processWriteQueue() {
@@ -136,13 +215,13 @@
                     // we just ignore this
                     this.ignoreException(re);
                 }
-                final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
-
                 final EventInfo info = new EventInfo();
                 info.event = event;
+                final String nodeName = this.getNodeName(event);
+
                 // if the job has no job id, we can just write the job to the repo and don't
                 // need locking
-                final String nodeName = this.getNodeName(event);
+                final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
                 if ( jobId == null ) {
                     try {
                         final Node eventNode = this.writeEvent(event, nodeName);
@@ -163,18 +242,18 @@
                             // if the node is locked, someone else was quicker
                             // and we don't have to process this job
                             if ( !foundNode.isLocked() ) {
-                                // node is already in repository, so we just overwrite it
+                                // node is already in repository, so if not finished we just use it
+                                // otherwise it has already been processed
                                 try {
-                                    foundNode.remove();
-                                    parentNode.save();
-                                    foundNode = null;
+                                    if ( !foundNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED) ) {
+                                        info.nodePath = foundNode.getPath();
+                                    }
                                 } catch (RepositoryException re) {
                                     // if anything goes wrong, it means that (hopefully) someone
                                     // else is processing this node
                                 }
                             }
-                        }
-                        if ( foundNode == null ) {
+                        } else {
                             // We now write the event into the repository
                             try {
                                 final Node eventNode = this.writeEvent(event, nodeName);
@@ -530,7 +609,7 @@
                             }
                             if ( !doNotProcess ) {
                                 final Node eventNode = (Node) s.getItem(nodePath);
-                                if ( !eventNode.isLocked() ) {
+                                if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
                                     try {
                                         final EventInfo info = new EventInfo();
                                         info.event = this.readEvent(eventNode);
@@ -659,10 +738,16 @@
                             this.ignoreException(e);
                         }
                         unlock = false;
-                        // remove node from repository
-                        final Node parentNode = eventNode.getParent();
-                        eventNode.remove();
-                        parentNode.save();
+                        final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+                        if ( jobId == null ) {
+                            // remove node from repository if no job id set
+                            final Node parentNode = eventNode.getParent();
+                            eventNode.remove();
+                            parentNode.save();
+                        } else {
+                            eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
+                            eventNode.save();
+                        }
                     }
                 } catch (RepositoryException re) {
                     // if an exception occurs, we just log

Modified: incubator/sling/trunk/sling/event/src/main/resources/SLING-INF/nodetypes/event.cnd
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/resources/SLING-INF/nodetypes/event.cnd?rev=635576&r1=635575&r2=635576&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/resources/SLING-INF/nodetypes/event.cnd (original)
+++ incubator/sling/trunk/sling/event/src/main/resources/SLING-INF/nodetypes/event.cnd Mon Mar 10 08:00:54 2008
@@ -39,7 +39,7 @@
 [slingevent:Job] > slingevent:Event, mix:lockable
   - slingevent:processor (string)
   - slingevent:id (string)
-
+  - slingevent:finished (date)
  
 [slingevent:TimedEvent] > slingevent:Event, mix:lockable
   - slingevent:processor (string)