You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/02/10 14:01:24 UTC

svn commit: r908478 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/job/

Author: cziegeler
Date: Wed Feb 10 13:01:23 2010
New Revision: 908478

URL: http://svn.apache.org/viewvc?rev=908478&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
Move job related utility classes into own package.
Add limit check to main queue
Restructure queue handling and fix rescheduling if an error occurs 

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
      - copied, changed from r908419, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java   (with props)
Removed:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=908478&r1=908477&r2=908478&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Wed Feb 10 13:01:23 2010
@@ -58,7 +58,8 @@
                             org.apache.sling.event;version=2.2.0
                         </Export-Package>
                         <Private-Package>
-                            org.apache.sling.event.impl
+                            org.apache.sling.event.impl,
+                            org.apache.sling.event.impl.job
                         </Private-Package>
                         <Sling-Nodetypes>
                             SLING-INF/nodetypes/event.cnd

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java?rev=908478&r1=908477&r2=908478&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java Wed Feb 10 13:01:23 2010
@@ -327,7 +327,7 @@
         }
     }
 
-    protected static final class EventInfo {
+    public static final class EventInfo {
         public String nodePath;
         public Event event;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908478&r1=908477&r2=908478&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Feb 10 13:01:23 2010
@@ -56,6 +56,8 @@
 import org.apache.sling.event.EventPropertiesMap;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
+import org.apache.sling.event.impl.job.JobBlockingQueue;
+import org.apache.sling.event.impl.job.ParallelInfo;
 import org.osgi.framework.Constants;
 import org.osgi.service.component.ComponentConstants;
 import org.osgi.service.component.ComponentContext;
@@ -94,6 +96,9 @@
     /** A map for keeping track of currently processed job topics. */
     private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
 
+    /** A map for keeping track of currently processed parallel job topics. */
+    private final Map<String, Integer> parallelProcessingMap = new HashMap<String, Integer>();
+
     /** A map for the different job queues. */
     private final Map<String, JobBlockingQueue> jobQueues = new HashMap<String, JobBlockingQueue>();
 
@@ -542,7 +547,7 @@
         // load unprocessed jobs from repository
         if ( this.running ) {
             logger.info("Apache Sling Job Event Handler started.");
-            logger.debug("Job Handler Configuration: (sleepTime={}secs,maxJobRetries={},waitForAck={}ms,maximumParallelJobs={},cleanupPeriod={}min)",
+            logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}, waitForAck={} ms, maximumParallelJobs={}, cleanupPeriod={} min)",
                     new Object[] {sleepTime, maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod});
         } else {
             final ComponentContext ctx = this.componentContext;
@@ -582,7 +587,7 @@
                         logger.debug("Queuing job {} into queue {}.", info.event, queueName);
                         BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
                         if ( jobQueue == null ) {
-                            final JobBlockingQueue jq = new JobBlockingQueue();
+                            final JobBlockingQueue jq = new JobBlockingQueue(queueName, this.logger);
                             jobQueue = jq;
                             this.jobQueues.put(queueName, jq);
                             // Start background thread
@@ -617,7 +622,43 @@
 
                 // if we still have a job, process it
                 if ( info != null ) {
-                    this.executeJob(info, null);
+                    if ( this.executeJob(info, null) == Status.RESCHEDULE ) {
+                        logger.debug("Putting job {} back into the queue.", info.event);
+                        final EventInfo eInfo = info;
+                        final Date fireDate = new Date();
+                        fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
+
+                            // we put it back into the queue after a specific time
+                        final Runnable r = new Runnable() {
+
+                            /**
+                             * @see java.lang.Runnable#run()
+                             */
+                            public void run() {
+                                try {
+                                    queue.put(eInfo);
+                                } catch (InterruptedException e) {
+                                    // ignore
+                                    ignoreException(e);
+                                }
+                            }
+
+                        };
+                        try {
+                            this.scheduler.fireJobAt(null, r, null, fireDate);
+                        } catch (Exception e) {
+                            // we ignore the exception
+                            ignoreException(e);
+                            // then wait for the time and readd the job
+                            try {
+                                Thread.sleep(sleepTime * 1000);
+                            } catch (InterruptedException ie) {
+                                // ignore
+                                ignoreException(ie);
+                            }
+                            r.run();
+                        }
+                    }
                 }
             }
         }
@@ -645,7 +686,8 @@
                 synchronized ( jobQueue.getLock()) {
                     final EventInfo processInfo = info;
                     info = null;
-                    if ( this.executeJob(processInfo, jobQueue) ) {
+                    final Status status = this.executeJob(processInfo, jobQueue);
+                    if ( status == Status.SUCCESS ) {
                         EventInfo newInfo = null;
                         try {
                             newInfo = jobQueue.waitForFinish();
@@ -654,76 +696,26 @@
                         }
                         // if we have an info, this is a reschedule
                         if ( newInfo != null ) {
-                            final EventInfo newEventInfo = newInfo;
-                            final Event job = newInfo.event;
-
-                            // is this an ordered queue?
-                            final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
-
-                            if ( orderedQueue ) {
-                                // we just sleep for the delay time - if none, we continue and retry
-                                // this job again
-                                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
-                                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                                    jobQueue.setSleeping(true, Thread.currentThread());
-                                    try {
-                                        Thread.sleep(delay);
-                                    } catch (InterruptedException e) {
-                                        this.ignoreException(e);
-                                    } finally {
-                                        jobQueue.setSleeping(false);
-                                    }
-                                }
-                                info = newInfo;
-                            } else {
-                                // delay rescheduling?
-                                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
-                                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                                    final Date fireDate = new Date();
-                                    fireDate.setTime(System.currentTimeMillis() + delay);
-
-                                    final String schedulerJobName = "Waiting:" + queueName;
-                                    final Runnable t = new Runnable() {
-                                        public void run() {
-                                            jobQueue.setSleeping(true, schedulerJobName);
-                                            try {
-                                                jobQueue.put(newEventInfo);
-                                            } catch (InterruptedException e) {
-                                                // this should never happen
-                                                ignoreException(e);
-                                            } finally {
-                                                jobQueue.setSleeping(false);
-                                            }
-                                        }
-                                    };
-                                    try {
-                                        this.scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
-                                    } catch (Exception e) {
-                                        // we ignore the exception and just put back the job in the queue
-                                        ignoreException(e);
-                                        t.run();
-                                    }
-                                } else {
-                                    // put directly into queue
-                                    try {
-                                        jobQueue.put(newInfo);
-                                    } catch (InterruptedException e) {
-                                        // this should never happen
-                                        this.ignoreException(e);
-                                    }
-                                }
-                            }
+                            info = ((JobBlockingQueue)queue).reschedule(newInfo, this.scheduler);
                         }
+                    } else if ( status == Status.RESCHEDULE ) {
+                        info = ((JobBlockingQueue)queue).reschedule(processInfo, this.scheduler);
                     }
                 }
             }
         }
     }
 
+    public enum Status {
+        FAILED,
+        RESCHEDULE,
+        SUCCESS
+    };
+
     /**
      * Process a job
      */
-    private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
+    private Status executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
         boolean putback = false;
         boolean wait = false;
         synchronized (this.backgroundLock) {
@@ -735,13 +727,15 @@
                      && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
                     final Event event = info.event;
                     final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-                    final boolean parallelProcessing = this.useParallelProcessing(event);
+                    final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
 
-                    // check how we can process this job
-                    // if parallel processing is allowed, we can just process
-                    // if not we should check if any other job with the same topic is currently running
-                    boolean process = parallelProcessing;
-                    if ( !parallelProcessing ) {
+                    // check how we can process this job:
+                    // if the job should not be processed in parallel, we have to check
+                    //     if another job with the same topic is currently running
+                    // if parallel processing is allowed, we have to check for the number
+                    //     of max allowed parallel jobs for this topic
+                    boolean process = parInfo.processParallel;
+                    if ( !parInfo.processParallel ) {
                         synchronized ( this.processingMap ) {
                             final Boolean value = this.processingMap.get(jobTopic);
                             if ( value == null || !value.booleanValue() ) {
@@ -749,6 +743,18 @@
                                 process = true;
                             }
                         }
+                    } else {
+                        if ( parInfo.maxParallelJob > 1 ) {
+                            synchronized ( this.parallelProcessingMap ) {
+                                final Integer value = this.parallelProcessingMap.get(jobTopic);
+                                final int currentValue = (value == null ? 0 : value.intValue());
+                                if ( currentValue < parInfo.maxParallelJob ) {
+                                    this.parallelProcessingMap.put(jobTopic, currentValue + 1);
+                                } else {
+                                    process = false;
+                                }
+                            }
+                        }
                     }
                     // check number of parallel jobs for main queue
                     if ( process && jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) {
@@ -771,16 +777,25 @@
                                 if ( process ) {
                                     unlock = false;
                                     this.processJob(info.event, eventNode, jobQueue == null);
-                                    return true;
+                                    return Status.SUCCESS;
                                 }
                             }
                         } catch (RepositoryException e) {
                             // ignore
                             this.ignoreException(e);
                         } finally {
-                            if ( unlock && !parallelProcessing ) {
-                                synchronized ( this.processingMap ) {
-                                    this.processingMap.put(jobTopic, Boolean.FALSE);
+                            if ( unlock ) {
+                                if ( !parInfo.processParallel ) {
+                                    synchronized ( this.processingMap ) {
+                                        this.processingMap.put(jobTopic, Boolean.FALSE);
+                                    }
+                                } else {
+                                    if ( parInfo.maxParallelJob > 1 ) {
+                                        synchronized ( this.parallelProcessingMap ) {
+                                            final Integer value = this.parallelProcessingMap.get(jobTopic);
+                                            this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
+                                        }
+                                    }
                                 }
                             }
                         }
@@ -813,45 +828,11 @@
                 ignoreException(ie);
             }
         }
-        // if we have to put back the job, we do it now
+        // if we have to put back the job, we return this status
         if ( putback ) {
-            logger.debug("Putting job {} back into the queue.", info.event);
-            final EventInfo eInfo = info;
-            final Date fireDate = new Date();
-            fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
-
-                // we put it back into the queue after a specific time
-            final Runnable r = new Runnable() {
-
-                /**
-                 * @see java.lang.Runnable#run()
-                 */
-                public void run() {
-                    try {
-                        queue.put(eInfo);
-                    } catch (InterruptedException e) {
-                        // ignore
-                        ignoreException(e);
-                    }
-                }
-
-            };
-            try {
-                this.scheduler.fireJobAt(null, r, null, fireDate);
-            } catch (Exception e) {
-                // we ignore the exception
-                ignoreException(e);
-                // then wait for the time and readd the job
-                try {
-                    Thread.sleep(sleepTime * 1000);
-                } catch (InterruptedException ie) {
-                    // ignore
-                    ignoreException(ie);
-                }
-                r.run();
-            }
+            return Status.RESCHEDULE;
         }
-        return false;
+        return Status.FAILED;
     }
 
     /**
@@ -992,7 +973,8 @@
      * @param isMainQueue Is this the main queue?
      */
     private void processJob(Event event, Node eventNode, boolean isMainQueue)  {
-        final boolean parallelProcessing = this.useParallelProcessing(event);
+        final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
+        final boolean parallelProcessing = parInfo.processParallel;
         final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         logger.debug("Starting job {}", event);
         boolean unlock = true;
@@ -1033,7 +1015,15 @@
                     synchronized ( this.processingMap ) {
                         this.processingMap.put(jobTopic, Boolean.FALSE);
                     }
+                } else {
+                    if ( parInfo.maxParallelJob > 1 ) {
+                        synchronized ( this.parallelProcessingMap ) {
+                            final Integer value = this.parallelProcessingMap.get(jobTopic);
+                            this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
+                        }
+                    }
                 }
+
                 // unlock node
                 try {
                     eventNode.unlock();
@@ -1296,7 +1286,8 @@
         } else {
             this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job);
         }
-        final boolean parallelProcessing = this.useParallelProcessing(job);
+        final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
+        final boolean parallelProcessing = parInfo.processParallel;
         EventInfo putback = null;
         // we have to use the same session for unlocking that we used for locking!
         synchronized ( this.backgroundLock ) {
@@ -1343,11 +1334,18 @@
                     // if an exception occurs, we just log
                     this.logger.error("Exception during job finishing.", re);
                 } finally {
+                    final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
                     if ( !parallelProcessing) {
-                        final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
                         synchronized ( this.processingMap ) {
                             this.processingMap.put(jobTopic, Boolean.FALSE);
                         }
+                    } else {
+                        if ( parInfo.maxParallelJob > 1 ) {
+                            synchronized ( this.parallelProcessingMap ) {
+                                final Integer value = this.parallelProcessingMap.get(jobTopic);
+                                this.parallelProcessingMap.put(jobTopic, value.intValue() - 1);
+                            }
+                        }
                     }
                     if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
                         this.parallelJobCount--;
@@ -1670,34 +1668,6 @@
         }
     }
 
-    /**
-     * Check if the job should be handled in parallel.
-     * For improved processing we first check if a job queue name is set
-     * and always return true. This is a pure implementation thing!
-     * If the parallel property is set and is a boolean object, we return
-     * its value; if it is any other object we use its string value
-     * and return false if the string value is either "no" or "false".
-     * If any other value is set we return true, if the property is not
-     * set we return false.
-     */
-    private boolean useParallelProcessing(final Event job) {
-        if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
-            return true;
-        }
-        final Object value = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL);
-        if ( value != null ) {
-            if ( value instanceof Boolean ) {
-                return ((Boolean)value).booleanValue();
-            }
-            final String strValue = value.toString();
-            if ( "no".equalsIgnoreCase(strValue) || "false".equalsIgnoreCase(strValue) ) {
-                return false;
-            }
-            return true;
-        }
-        return false;
-    }
-
     private static final class StartedJobInfo {
         public final Event event;
         public final String nodePath;

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (from r908419, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java&r1=908419&r2=908478&rev=908478&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobBlockingQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Wed Feb 10 13:01:23 2010
@@ -16,11 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl;
+package org.apache.sling.event.impl.job;
 
+import java.util.Date;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.AbstractRepositoryEventHandler.EventInfo;
+import org.osgi.service.event.Event;
+import org.slf4j.Logger;
 
 /**
  * The job blocking queue extends the blocking queue by some
@@ -43,6 +48,17 @@
     private volatile String schedulerJobName;
     private volatile Thread sleepingThread;
 
+    /** The queue name. */
+    private final String queueName;
+
+    /** The logger. */
+    private final Logger logger;
+
+    public JobBlockingQueue(final String name, final Logger logger) {
+        this.queueName = name;
+        this.logger = logger;
+    }
+
     public EventInfo waitForFinish() throws InterruptedException {
         this.isWaiting = true;
         this.markForCleanUp = false;
@@ -113,5 +129,83 @@
     public boolean isSleeping() {
         return this.isSleeping;
     }
+
+    /**
+     * Reschedule a job.
+     * If this is a ordered queue, this method will return the event info
+     * which should be processed next. Otherwise null is returned.
+     */
+    public EventInfo reschedule(final EventInfo info, final Scheduler scheduler) {
+        final Event job = info.event;
+        // is this an ordered queue?
+        final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+        if ( orderedQueue ) {
+            // we just sleep for the delay time - if none, we continue and retry
+            // this job again
+            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                setSleeping(true, Thread.currentThread());
+                try {
+                    Thread.sleep(delay);
+                } catch (InterruptedException e) {
+                    this.ignoreException(e);
+                } finally {
+                    setSleeping(false);
+                }
+            }
+            return info;
+        }
+        if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+            final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            final String schedulerJobName = "Waiting:" + queueName;
+            final Runnable t = new Runnable() {
+                public void run() {
+                    setSleeping(true, schedulerJobName);
+                    try {
+                        put(info);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        ignoreException(e);
+                    } finally {
+                        setSleeping(false);
+                    }
+                }
+            };
+            if ( scheduler != null ) {
+                try {
+                    scheduler.fireJobAt(schedulerJobName, t, null, fireDate);
+                } catch (Exception e) {
+                    // we ignore the exception and just put back the job in the queue
+                    ignoreException(e);
+                    t.run();
+                }
+            } else {
+                t.run();
+            }
+        } else {
+            // put directly into queue
+            try {
+                put(info);
+            } catch (InterruptedException e) {
+                // this should never happen
+                this.ignoreException(e);
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    protected void ignoreException(Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
 }
 

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java?rev=908478&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java Wed Feb 10 13:01:23 2010
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.job;
+
+import org.apache.sling.event.EventUtil;
+import org.osgi.service.event.Event;
+
+/**
+ * Helper class for the job processing.
+ * This class is used to get the parallel processing information about
+ * a job.
+ */
+public class ParallelInfo {
+
+    public static final ParallelInfo UNLIMITED = new ParallelInfo(true, -1);
+    public static final ParallelInfo SERIAL = new ParallelInfo(false, -1);
+
+    public final boolean processParallel;
+    public final int maxParallelJob;
+
+    private ParallelInfo(final boolean processParallel, final int maxParallelJob) {
+        this.processParallel = processParallel;
+        this.maxParallelJob = maxParallelJob;
+    }
+
+    /**
+     * Return the parallel processing information
+     * For improved processing we first check if a job queue name is set
+     * and always return true. This is a pure implementation thing!
+     * If the parallel property is set, the following checks are performed:
+     * - boolean object: if false, no parallel processing, if true
+     *                   full parallel processing
+     * - number object: if higher than 1, parallel processing with the
+     *                  specified value, else no parallel processing
+     * - string value: if "no" or "false", no parallel processing
+     *                 if it is a string representation of a number,
+     *                 it's treated like a number object (see above)
+     *                 in all other cases unlimited parallel processing
+     *                 is returnd.
+     * If the property is not set we return no parallel processing.
+     */
+    public static ParallelInfo getParallelInfo(final Event job) {
+        if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+            return ParallelInfo.UNLIMITED;
+        }
+        final Object value = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL);
+        if ( value != null ) {
+            if ( value instanceof Boolean ) {
+                final boolean result = ((Boolean)value).booleanValue();
+                if ( result ) {
+                    return ParallelInfo.UNLIMITED;
+                }
+                return ParallelInfo.SERIAL;
+            } else if ( value instanceof Number ) {
+                final int result = ((Number)value).intValue();
+                if ( result > 1 ) {
+                    return new ParallelInfo(true, result);
+                }
+                return ParallelInfo.SERIAL;
+            }
+            final String strValue = value.toString();
+            if ( "no".equalsIgnoreCase(strValue) || "false".equalsIgnoreCase(strValue) ) {
+                return ParallelInfo.SERIAL;
+            }
+            // check if this is a number
+            try {
+                final int result = Integer.valueOf(strValue).intValue();
+                if ( result > 1 ) {
+                    return new ParallelInfo(true, result);
+                }
+                return ParallelInfo.SERIAL;
+            } catch (NumberFormatException ne) {
+                // we ignore this
+            }
+            return ParallelInfo.UNLIMITED;
+        }
+        return ParallelInfo.SERIAL;
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain