You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/08/27 15:52:11 UTC

svn commit: r689479 - /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Author: cziegeler
Date: Wed Aug 27 06:52:11 2008
New Revision: 689479

URL: http://svn.apache.org/viewvc?rev=689479&view=rev
Log:
SLING-628 - Implement separate queues, requeuing and ordered queues.

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=689479&r1=689478&r2=689479&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Aug 27 06:52:11 2008
@@ -79,7 +79,7 @@
     private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
 
     /** A map for the different job queues. */
-    private final Map<String, BlockingQueue<EventInfo>> jobQueues = new HashMap<String, BlockingQueue<EventInfo>>();
+    private final Map<String, JobBlockingQueue> jobQueues = new HashMap<String, JobBlockingQueue>();
 
     /** Default sleep time. */
     private static final long DEFAULT_SLEEP_TIME = 30;
@@ -144,7 +144,7 @@
     protected void deactivate(final ComponentContext context) {
         super.deactivate(context);
         synchronized ( this.jobQueues ) {
-            final Iterator<BlockingQueue<EventInfo>> i = this.jobQueues.values().iterator();
+            final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator();
             while ( i.hasNext() ) {
                 try {
                     i.next().put(new EventInfo());
@@ -349,9 +349,9 @@
                     synchronized ( this.jobQueues ) {
                         BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
                         if ( jobQueue == null ) {
-                            jobQueue = new LinkedBlockingQueue<EventInfo>();
-                            final BlockingQueue<EventInfo> jq = jobQueue;
-                            this.jobQueues.put(queueName, jobQueue);
+                            final JobBlockingQueue jq = new JobBlockingQueue();
+                            jobQueue = jq;
+                            this.jobQueues.put(queueName, jq);
                             // Start background thread
                             this.threadPool.execute(new Runnable() {
 
@@ -388,19 +388,84 @@
      * @param queueName The name of the job queue
      * @param jobQueue The job queue
      */
-    private void runJobQueue(final String queueName, final BlockingQueue<EventInfo> jobQueue) {
+    private void runJobQueue(final String queueName, final JobBlockingQueue jobQueue) {
+        EventInfo info = null;
         while ( this.running ) {
-            // so let's wait/get the next job from the queue
-            EventInfo info = null;
-            try {
-                info = jobQueue.take();
-            } catch (InterruptedException e) {
-                // we ignore this
-                this.ignoreException(e);
+            if ( info == null ) {
+                // so let's wait/get the next job from the queue
+                try {
+                    info = jobQueue.take();
+                } catch (InterruptedException e) {
+                    // we ignore this
+                    this.ignoreException(e);
+                }
             }
 
             if ( info != null && this.running ) {
-                this.executeJob(info, jobQueue);
+                final EventInfo processInfo = info;
+                info = null;
+                if ( this.executeJob(processInfo, jobQueue) ) {
+                    try {
+                        jobQueue.wait();
+                    } catch (InterruptedException e) {
+                        this.ignoreException(e);
+                    }
+                    // if we have an info, this is a reschedule
+                    final EventInfo newInfo = jobQueue.get();
+                    if ( newInfo != null ) {
+                        final Event job = newInfo.event;
+
+                        // is this an ordered queue?
+                        final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+
+                        if ( orderedQueue ) {
+                            // we just sleep for the delay time - if none, we continue and retry
+                            // this job again
+                            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                                final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                                try {
+                                    Thread.sleep(delay);
+                                } catch (InterruptedException e) {
+                                    this.ignoreException(e);
+                                }
+                            }
+                            info = newInfo;
+                        } else {
+                            // delay rescheduling?
+                            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                                final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                                final Date fireDate = new Date();
+                                fireDate.setTime(System.currentTimeMillis() + delay);
+
+                                final Runnable t = new Runnable() {
+                                    public void run() {
+                                        try {
+                                            jobQueue.put(newInfo);
+                                        } catch (InterruptedException e) {
+                                            // this should never happen
+                                            ignoreException(e);
+                                        }
+                                    }
+                                };
+                                try {
+                                    this.scheduler.fireJobAt(null, t, null, fireDate);
+                                } catch (Exception e) {
+                                    // we ignore the exception and just put back the job in the queue
+                                    ignoreException(e);
+                                    t.run();
+                                }
+                            } else {
+                                // put directly into queue
+                                try {
+                                    jobQueue.put(newInfo);
+                                } catch (InterruptedException e) {
+                                    // this should never happen
+                                    this.ignoreException(e);
+                                }
+                            }
+                        }
+                    }
+                }
             }
         }
     }
@@ -408,7 +473,7 @@
     /**
      * Process a job
      */
-    private void executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
+    private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
         // check if the node still exists
         synchronized (this.backgroundSession) {
             try {
@@ -424,7 +489,7 @@
                     // if parallel processing is allowed, we can just process
                     // if not we should check if any other job with the same topic is currently running
                     boolean process = parallelProcessing;
-                    if ( !process ) {
+                    if ( !parallelProcessing ) {
                         synchronized ( this.processingMap ) {
                             final Boolean value = this.processingMap.get(jobTopic);
                             if ( value == null || !value.booleanValue() ) {
@@ -449,6 +514,7 @@
                                 if ( process ) {
                                     unlock = false;
                                     this.processJob(info.event, eventNode);
+                                    return true;
                                 }
                             }
                         } catch (RepositoryException e) {
@@ -478,11 +544,7 @@
                                      */
                                     public void run() {
                                         try {
-                                            if ( jobQueue != null ) {
-                                                jobQueue.put(eInfo);
-                                            } else {
-                                                queue.put(eInfo);
-                                            }
+                                            queue.put(eInfo);
                                         } catch (InterruptedException e) {
                                             // ignore
                                             ignoreException(e);
@@ -517,6 +579,7 @@
             }
 
         }
+        return false;
     }
 
     /**
@@ -678,7 +741,10 @@
             eventNode.save();
             final EventAdmin localEA = this.eventAdmin;
             if ( localEA != null ) {
-                localEA.sendEvent(jobEvent);
+                // we need async delivery, otherwise we might create a deadlock
+                // as this method runs inside a synchronized block and the finishedJob
+                // method as well!
+                localEA.postEvent(jobEvent);
                 // do not unlock if sending was successful
                 unlock = false;
             } else {
@@ -862,6 +928,8 @@
     }
 
     /**
+     * This is a notification from the component which processed the job.
+     *
      * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean)
      */
     public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) {
@@ -880,7 +948,7 @@
             if ( retries != -1 && retryCount > retries ) {
                 reschedule = false;
             }
-            if ( !reschedule ) {
+            if ( reschedule ) {
                 // update event with retry count and retries
                 final Dictionary<String, Object> newProperties;
                 // create a new dictionary
@@ -917,7 +985,7 @@
                         unlock = false;
                         final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
                         if ( jobId == null ) {
-                            // remove node from repository if no job id set
+                            // remove node from repository if no job is set
                             final Node parentNode = eventNode.getParent();
                             eventNode.remove();
                             parentNode.save();
@@ -967,37 +1035,62 @@
                         // this should never happen
                         this.ignoreException(e);
                     }
-                    // delay rescheduling?
-                    if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
-                        final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                        final Date fireDate = new Date();
-                        fireDate.setTime(System.currentTimeMillis() + delay);
+                    // if this is an own job queue, we simply signal the queue to continue
+                    // it will pick up the event and either reschedule or wait
+                    if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                        // we know the queue exists
+                        final JobBlockingQueue jobQueue;
+                        synchronized ( this.jobQueues ) {
+                            jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
+                        }
+                        jobQueue.set(info);
+                        jobQueue.notify();
+                    } else {
 
-                        final Runnable t = new Runnable() {
-                            public void run() {
-                                try {
-                                    queue.put(info);
-                                } catch (InterruptedException e) {
-                                    // this should never happen
-                                    ignoreException(e);
+                        // delay rescheduling?
+                        if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                            final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                            final Date fireDate = new Date();
+                            fireDate.setTime(System.currentTimeMillis() + delay);
+
+                            final Runnable t = new Runnable() {
+                                public void run() {
+                                    try {
+                                        queue.put(info);
+                                    } catch (InterruptedException e) {
+                                        // this should never happen
+                                        ignoreException(e);
+                                    }
                                 }
+                            };
+                            try {
+                                this.scheduler.fireJobAt(null, t, null, fireDate);
+                            } catch (Exception e) {
+                                // we ignore the exception and just put back the job in the queue
+                                ignoreException(e);
+                                t.run();
+                            }
+                        } else {
+                            // put directly into queue
+                            try {
+                                queue.put(info);
+                            } catch (InterruptedException e) {
+                                // this should never happen
+                                this.ignoreException(e);
                             }
-                        };
-                        try {
-                            this.scheduler.fireJobAt(null, t, null, fireDate);
-                        } catch (Exception e) {
-                            // we ignore the exception and just put back the job in the queue
-                            ignoreException(e);
-                            t.run();
                         }
-                    } else {
-                        // put directly into queue
-                        try {
-                            this.queue.put(info);
-                        } catch (InterruptedException e) {
-                            // this should never happen
-                            this.ignoreException(e);
+                    }
+                } else {
+                    // if this is an own job queue, we simply signal the queue to continue
+                    // it will pick up the event continue with the next event
+                    if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                        // we know the queue exists
+                        final JobBlockingQueue jobQueue;
+                        synchronized ( this.jobQueues ) {
+                            jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
                         }
+                        jobQueue.set(null);
+                        jobQueue.notify();
                     }
                 }
                 if ( !shouldReschedule ) {
@@ -1036,15 +1129,16 @@
             buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
             buffer.append(")");
             if ( topic != null ) {
-                buffer.append(" @");
+                buffer.append(" and @");
                 buffer.append(EventHelper.NODE_PROPERTY_TOPIC);
                 buffer.append(" = '");
                 buffer.append(topic);
                 buffer.append("'");
             }
             if ( locked ) {
-                buffer.append(" and ");
-                buffer.append("@jcr:lockOwner");
+                buffer.append(" and @jcr:lockOwner");
+            } else {
+                buffer.append(" and not(@jcr:lockOwner)");
             }
             if ( filterProps != null ) {
                 final Iterator<Map.Entry<String, Object>> i = filterProps.entrySet().iterator();
@@ -1126,4 +1220,19 @@
     public Collection<Event> getScheduledJobs(String topic, Map<String, Object> filterProps) {
         return this.queryCurrentJobs(topic, null, false);
     }
+
+    private static final class JobBlockingQueue extends LinkedBlockingQueue<EventInfo> {
+
+        private EventInfo eventInfo;
+
+        public void set(EventInfo i) {
+            this.eventInfo = i;
+        }
+
+        public EventInfo get() {
+            final EventInfo object = this.eventInfo;
+            this.eventInfo = null;
+            return object;
+        }
+    }
 }