You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/10/01 11:51:04 UTC

svn commit: r700723 - in /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl: EventHelper.java JobEventHandler.java

Author: cziegeler
Date: Wed Oct  1 02:51:03 2008
New Revision: 700723

URL: http://svn.apache.org/viewvc?rev=700723&view=rev
Log:
Clean up eventing code

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java?rev=700723&r1=700722&r2=700723&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/EventHelper.java Wed Oct  1 02:51:03 2008
@@ -41,11 +41,8 @@
     public static final String NODE_PROPERTY_TE_DATE = "slingevent:date";
     public static final String NODE_PROPERTY_TE_PERIOD = "slingevent:period";
 
-    public static final String EVENTS_NODE_TYPE = "slingevent:Events";
     public static final String EVENT_NODE_TYPE = "slingevent:Event";
-    public static final String JOBS_NODE_TYPE = "slingevent:Jobs";
     public static final String JOB_NODE_TYPE = "slingevent:Job";
-    public static final String TIMED_EVENTS_NODE_TYPE = "slingevent:TimedEvents";
     public static final String TIMED_EVENT_NODE_TYPE = "slingevent:TimedEvent";
 
     /** The nodetype for newly created folders */

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=700723&r1=700722&r2=700723&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Oct  1 02:51:03 2008
@@ -51,6 +51,7 @@
 import org.apache.sling.event.EventPropertiesMap;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
+import org.osgi.service.component.ComponentConstants;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -59,7 +60,7 @@
 /**
  * An event handler for special job events.
  *
- * @scr.component label="%job.events.name" description="%job.events.description"
+ * @scr.component label="%job.events.name" description="%job.events.description" immediate="true"
  * @scr.service interface="org.apache.sling.event.JobStatusProvider"
  * @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB"
  *               values.updated="org/osgi/framework/BundleEvent/UPDATED"
@@ -121,6 +122,9 @@
     /** The scheduler for rescheduling jobs. @scr.reference */
     private Scheduler scheduler;
 
+    /** Our component context. */
+    private ComponentContext componentContext;
+
     public static ThreadPool JOB_THREAD_POOL;
 
     /**
@@ -135,6 +139,7 @@
         this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
         this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
         this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
+        this.componentContext = context;
         super.activate(context);
         JOB_THREAD_POOL = this.threadPool;
     }
@@ -164,6 +169,7 @@
             this.backgroundSession.logout();
             this.backgroundSession = null;
         }
+        this.componentContext = null;
         JOB_THREAD_POOL = null;
     }
 
@@ -238,6 +244,7 @@
                 this.ignoreException(e);
             }
             if ( event != null && this.running ) {
+                logger.debug("Persisting job {}", event);
                 try {
                     this.writerSession.refresh(false);
                 } catch (RepositoryException re) {
@@ -335,6 +342,15 @@
         // load unprocessed jobs from repository
         if ( this.running ) {
             this.loadJobs();
+        } else {
+            logger.info("Deactivating component due to errors.");
+            // deactivate
+            final ComponentContext ctx = this.componentContext;
+            if ( ctx != null ) {
+                final String name = (String) componentContext.getProperties().get(
+                    ComponentConstants.COMPONENT_NAME);
+                ctx.disableComponent(name);
+            }
         }
         while ( this.running ) {
             // so let's wait/get the next job from the queue
@@ -489,10 +505,11 @@
      * Process a job
      */
     private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
-        // check if the node still exists
+        boolean putback = false;
         synchronized (this.backgroundSession) {
             try {
                 this.backgroundSession.refresh(false);
+                // check if the node still exists
                 if ( this.backgroundSession.itemExists(info.nodePath)
                      && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
                     final Event event = info.event;
@@ -547,41 +564,7 @@
                             // check if the node is in processing or already finished
                             final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
                             if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
-                                final EventInfo eInfo = info;
-                                final Date fireDate = new Date();
-                                fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
-
-                                    // we put it back into the queue after a specific time
-                                final Runnable r = new Runnable() {
-
-                                    /**
-                                     * @see java.lang.Runnable#run()
-                                     */
-                                    public void run() {
-                                        try {
-                                            queue.put(eInfo);
-                                        } catch (InterruptedException e) {
-                                            // ignore
-                                            ignoreException(e);
-                                        }
-                                    }
-
-                                };
-                                try {
-                                    this.scheduler.fireJobAt(null, r, null, fireDate);
-                                } catch (Exception e) {
-                                    // we ignore the exception
-                                    ignoreException(e);
-                                    // then wait for the time and readd the job
-                                    try {
-                                        Thread.sleep(sleepTime * 1000);
-                                    } catch (InterruptedException ie) {
-                                        // ignore
-                                        ignoreException(ie);
-                                    }
-                                    r.run();
-                                }
-
+                                putback = true;
                             }
                         } catch (RepositoryException e) {
                             // ignore
@@ -594,6 +577,42 @@
             }
 
         }
+        if ( putback ) {
+            final EventInfo eInfo = info;
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
+
+                // we put it back into the queue after a specific time
+            final Runnable r = new Runnable() {
+
+                /**
+                 * @see java.lang.Runnable#run()
+                 */
+                public void run() {
+                    try {
+                        queue.put(eInfo);
+                    } catch (InterruptedException e) {
+                        // ignore
+                        ignoreException(e);
+                    }
+                }
+
+            };
+            try {
+                this.scheduler.fireJobAt(null, r, null, fireDate);
+            } catch (Exception e) {
+                // we ignore the exception
+                ignoreException(e);
+                // then wait for the time and readd the job
+                try {
+                    Thread.sleep(sleepTime * 1000);
+                } catch (InterruptedException ie) {
+                    // ignore
+                    ignoreException(ie);
+                }
+                r.run();
+            }
+        }
         return false;
     }
 
@@ -608,10 +627,12 @@
      * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
      */
     public void handleEvent(final Event event) {
+        logger.debug("Receiving event {}", event);
         // we ignore remote job events
         if ( EventUtil.isLocal(event) ) {
             // check for bundle event
             if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
+                logger.debug("Handling local job {}", event);
                 // job event
                 final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
 
@@ -873,7 +894,9 @@
             buffer.append(this.repositoryPath);
             buffer.append("//element(*, ");
             buffer.append(this.getEventNodeType());
-            buffer.append(")");
+            buffer.append(") order by @");
+            buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+            buffer.append(" ascending");
             final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
             final NodeIterator result = q.execute().getNodes();
             while ( result.hasNext() ) {
@@ -938,6 +961,7 @@
         }
         final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
                                         || job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+        EventInfo putback = null;
         // we have to use the same session for unlocking that we used for locking!
         synchronized ( this.backgroundSession ) {
             try {
@@ -1028,27 +1052,7 @@
 
                         // delay rescheduling?
                         if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
-                            final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                            final Date fireDate = new Date();
-                            fireDate.setTime(System.currentTimeMillis() + delay);
-
-                            final Runnable t = new Runnable() {
-                                public void run() {
-                                    try {
-                                        queue.put(info);
-                                    } catch (InterruptedException e) {
-                                        // this should never happen
-                                        ignoreException(e);
-                                    }
-                                }
-                            };
-                            try {
-                                this.scheduler.fireJobAt(null, t, null, fireDate);
-                            } catch (Exception e) {
-                                // we ignore the exception and just put back the job in the queue
-                                ignoreException(e);
-                                t.run();
-                            }
+                            putback = info;
                         } else {
                             // put directly into queue
                             try {
@@ -1073,15 +1077,39 @@
                         }
                     }
                 }
-                if ( !shouldReschedule ) {
-                    return true;
-                }
-                return reschedule;
             } catch (RepositoryException re) {
                 this.logger.error("Unable to create new session.", re);
                 return false;
             }
         }
+        if ( putback != null ) {
+            final EventInfo info = putback;
+            final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            final Runnable t = new Runnable() {
+                public void run() {
+                    try {
+                        queue.put(info);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        ignoreException(e);
+                    }
+                }
+            };
+            try {
+                this.scheduler.fireJobAt(null, t, null, fireDate);
+            } catch (Exception e) {
+                // we ignore the exception and just put back the job in the queue
+                ignoreException(e);
+                t.run();
+            }
+        }
+        if ( !shouldReschedule ) {
+            return true;
+        }
+        return reschedule;
     }
 
     /**