You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/08/14 13:25:45 UTC

svn commit: r685856 - /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Author: cziegeler
Date: Thu Aug 14 04:25:44 2008
New Revision: 685856

URL: http://svn.apache.org/viewvc?rev=685856&view=rev
Log:
SLING-612 : Put the job back in the queue after the configured waiting time (which is secs and not ms). Check if job has already been processed by another cluster node

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=685856&r1=685855&r2=685856&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Thu Aug 14 04:25:44 2008
@@ -76,7 +76,7 @@
     protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
 
     /** Default sleep time. */
-    protected static final long DEFAULT_SLEEP_TIME = 20;
+    protected static final long DEFAULT_SLEEP_TIME = 60;
 
     /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
     protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
@@ -87,7 +87,7 @@
     /** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
     protected static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
 
-    /** We check every 20 secs by default. */
+    /** We check every 60 secs by default. */
     protected long sleepTime;
 
     /** How often should a job be retried by default. */
@@ -313,11 +313,13 @@
             }
 
             if ( info != null && this.running ) {
+
                 // check if the node still exists
                 synchronized (this.backgroundSession) {
                     try {
                         this.backgroundSession.refresh(false);
-                        if ( this.backgroundSession.itemExists(info.nodePath) ) {
+                        if ( this.backgroundSession.itemExists(info.nodePath)
+                             && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
                             final Event event = info.event;
                             final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
                             final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
@@ -367,22 +369,31 @@
                                 try {
                                     // check if the node is in processing or already finished
                                     final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
-                                    if ( !eventNode.isLocked() ) {
-                                        try {
-                                            this.queue.put(info);
-                                        } catch (InterruptedException e) {
-                                            // ignore
-                                            this.ignoreException(e);
-                                        }
-                                        // wait time before we restart the cycle, if there is only one job in the queue!
-                                        if ( this.queue.size() == 1 ) {
-                                            try {
-                                                Thread.sleep(this.sleepTime);
-                                            } catch (InterruptedException e) {
-                                                // ignore
-                                                this.ignoreException(e);
+                                    if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                                        final EventInfo eInfo = info;
+                                        // we put it back into the queue after a specific time
+                                        this.threadPool.execute(new Runnable() {
+
+                                            /**
+                                             * @see java.lang.Runnable#run()
+                                             */
+                                            public void run() {
+                                                // wait time before we put it back into the pool
+                                                try {
+                                                    Thread.sleep(sleepTime * 1000);
+                                                } catch (InterruptedException e) {
+                                                    // ignore
+                                                    ignoreException(e);
+                                                }
+                                                try {
+                                                    queue.put(eInfo);
+                                                } catch (InterruptedException e) {
+                                                    // ignore
+                                                    ignoreException(e);
+                                                }
                                             }
-                                        }
+
+                                        });
                                     }
                                 } catch (RepositoryException e) {
                                     // ignore