You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/08/26 18:07:51 UTC

svn commit: r689124 - in /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event: EventUtil.java impl/JobEventHandler.java

Author: cziegeler
Date: Tue Aug 26 09:07:50 2008
New Revision: 689124

URL: http://svn.apache.org/viewvc?rev=689124&view=rev
Log:
SLING-628 : Provide separate queues if property queue name is provided.

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=689124&r1=689123&r2=689124&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Tue Aug 26 09:07:50 2008
@@ -44,6 +44,7 @@
 
 import org.apache.jackrabbit.util.ISO9075;
 import org.apache.sling.event.EventUtil.JobStatusNotifier.NotifierContext;
+import org.apache.sling.event.impl.JobEventHandler;
 import org.osgi.service.event.Event;
 import org.slf4j.LoggerFactory;
 
@@ -245,12 +246,13 @@
             }
 
         };
-        final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
-        if ( ctx != null ) {
-            ctx.notifier.execute(task);
+        // check if the job handler thread pool is available
+        if ( JobEventHandler.JOB_THREAD_POOL != null ) {
+            JobEventHandler.JOB_THREAD_POOL.execute(task);
         } else {
-            // if we don't have a job status notifier, we create the thread directly
-            // (this should never happen but is a safe fallback)
+            // if we don't have a thread pool, we create the thread directly
+            // (this should never happen for jobs, but is a safe fallback and
+            // allows to call this method for other background processing.
             new Thread(task).start();
         }
     }
@@ -284,11 +286,6 @@
          * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
          */
         boolean finishedJob(Event job, String eventNodePath, boolean reschedule);
-
-        /**
-         * Execute the job in the background
-         */
-        void execute(Runnable job);
     }
 
     /**

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=689124&r1=689123&r2=689124&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Tue Aug 26 09:07:50 2008
@@ -31,6 +31,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jcr.ItemExistsException;
 import javax.jcr.Node;
@@ -45,9 +47,9 @@
 import org.apache.jackrabbit.util.ISO8601;
 import org.apache.sling.commons.osgi.OsgiUtil;
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
-import org.osgi.framework.BundleEvent;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -73,50 +75,52 @@
     extends AbstractRepositoryEventHandler
     implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable {
 
-    /** The topic prefix for bundle events. */
-    protected static final String BUNDLE_EVENT_PREFIX = BundleEvent.class.getName().replace('.', '/') + '/';
-
     /** A map for keeping track of currently processed job topics. */
-    protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+    private final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
+
+    /** A map for the different job queues. */
+    private final Map<String, BlockingQueue<EventInfo>> jobQueues = new HashMap<String, BlockingQueue<EventInfo>>();
 
     /** Default sleep time. */
-    protected static final long DEFAULT_SLEEP_TIME = 30;
+    private static final long DEFAULT_SLEEP_TIME = 30;
 
     /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
-    protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
+    private static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
 
     /** Default number of job retries. */
-    protected static final int DEFAULT_MAX_JOB_RETRIES = 10;
+    private static final int DEFAULT_MAX_JOB_RETRIES = 10;
 
     /** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
-    protected static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
+    private static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
 
     /** We check every 30 secs by default. */
-    protected long sleepTime;
+    private long sleepTime;
 
     /** How often should a job be retried by default. */
-    protected int maxJobRetries;
+    private int maxJobRetries;
 
     /** Background session. */
-    protected Session backgroundSession;
+    private Session backgroundSession;
 
     /** Unloaded jobs. */
-    protected Set<String>unloadedJobs = new HashSet<String>();
+    private Set<String>unloadedJobs = new HashSet<String>();
 
     /** List of deleted jobs. */
-    protected Set<String>deletedJobs = new HashSet<String>();
+    private Set<String>deletedJobs = new HashSet<String>();
 
     /** Default clean up time is 10 minutes. */
-    protected static final int DEFAULT_CLEANUP_PERIOD = 10;
+    private static final int DEFAULT_CLEANUP_PERIOD = 10;
 
     /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
-    protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+    private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
 
     /** We remove everything which is older than 5 min by default. */
-    protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+    private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
 
     /** The scheduler for rescheduling jobs. @scr.reference */
-    protected Scheduler scheduler;
+    private Scheduler scheduler;
+
+    public static ThreadPool JOB_THREAD_POOL;
 
     /**
      * Activate this component.
@@ -131,6 +135,7 @@
         this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
         this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
         super.activate(context);
+        JOB_THREAD_POOL = this.threadPool;
     }
 
     /**
@@ -138,6 +143,16 @@
      */
     protected void deactivate(final ComponentContext context) {
         super.deactivate(context);
+        synchronized ( this.jobQueues ) {
+            final Iterator<BlockingQueue<EventInfo>> i = this.jobQueues.values().iterator();
+            while ( i.hasNext() ) {
+                try {
+                    i.next().put(new EventInfo());
+                } catch (InterruptedException e) {
+                    this.ignoreException(e);
+                }
+            }
+        }
         if ( this.backgroundSession != null ) {
             try {
                 this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
@@ -148,6 +163,7 @@
             this.backgroundSession.logout();
             this.backgroundSession = null;
         }
+        JOB_THREAD_POOL = null;
     }
 
     /**
@@ -314,123 +330,192 @@
             EventInfo info = null;
             try {
                 info = this.queue.take();
+            } catch (InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+
+            if ( info != null && this.running ) {
                 // check for local only jobs and remove them from the queue if they're meant
                 // for another application node
                 if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
                     && !this.applicationId.equals(EventUtil.PROPERTY_APPLICATION) ) {
                     info = null;
                 }
+
+                // check if we should put this into a separate queue
+                if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                    final String queueName = EventUtil.PROPERTY_JOB_QUEUE_NAME;
+                    synchronized ( this.jobQueues ) {
+                        BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName);
+                        if ( jobQueue == null ) {
+                            jobQueue = new LinkedBlockingQueue<EventInfo>();
+                            final BlockingQueue<EventInfo> jq = jobQueue;
+                            this.jobQueues.put(queueName, jobQueue);
+                            // Start background thread
+                            this.threadPool.execute(new Runnable() {
+
+                                /**
+                                 * @see java.lang.Runnable#run()
+                                 */
+                                public void run() {
+                                    runJobQueue(queueName, jq);
+                                }
+
+                            });
+                        }
+                        try {
+                            jobQueue.put(info);
+                        } catch (InterruptedException e) {
+                            // this should never happen
+                            this.ignoreException(e);
+                        }
+                    }
+                    // don't process this here
+                    info = null;
+                }
+
+                // if we still have a job, process it
+                if ( info != null ) {
+                    this.executeJob(info, null);
+                }
+            }
+        }
+    }
+
+    /**
+     * Execute a job queue
+     * @param queueName The name of the job queue
+     * @param jobQueue The job queue
+     */
+    private void runJobQueue(final String queueName, final BlockingQueue<EventInfo> jobQueue) {
+        while ( this.running ) {
+            // so let's wait/get the next job from the queue
+            EventInfo info = null;
+            try {
+                info = jobQueue.take();
             } catch (InterruptedException e) {
                 // we ignore this
                 this.ignoreException(e);
             }
 
             if ( info != null && this.running ) {
+                this.executeJob(info, jobQueue);
+            }
+        }
+    }
 
-                // check if the node still exists
-                synchronized (this.backgroundSession) {
-                    try {
-                        this.backgroundSession.refresh(false);
-                        if ( this.backgroundSession.itemExists(info.nodePath)
-                             && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
-                            final Event event = info.event;
-                            final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-                            final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
-
-                            // check how we can process this job
-                            // if parallel processing is allowed, we can just process
-                            // if not we should check if any other job with the same topic is currently running
-                            boolean process = parallelProcessing;
-                            if ( !process ) {
+    /**
+     * Process a job
+     */
+    private void executeJob(final EventInfo info, final BlockingQueue<EventInfo> jobQueue) {
+        // check if the node still exists
+        synchronized (this.backgroundSession) {
+            try {
+                this.backgroundSession.refresh(false);
+                if ( this.backgroundSession.itemExists(info.nodePath)
+                     && !this.backgroundSession.itemExists(info.nodePath + "/" + EventHelper.NODE_PROPERTY_FINISHED)) {
+                    final Event event = info.event;
+                    final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                    final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+                                                    || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+                    // check how we can process this job
+                    // if parallel processing is allowed, we can just process
+                    // if not we should check if any other job with the same topic is currently running
+                    boolean process = parallelProcessing;
+                    if ( !process ) {
+                        synchronized ( this.processingMap ) {
+                            final Boolean value = this.processingMap.get(jobTopic);
+                            if ( value == null || !value.booleanValue() ) {
+                                this.processingMap.put(jobTopic, Boolean.TRUE);
+                                process = true;
+                            }
+                        }
+
+                    }
+                    if ( process ) {
+                        boolean unlock = true;
+                        try {
+                            final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+                            if ( !eventNode.isLocked() ) {
+                                // lock node
+                                try {
+                                    eventNode.lock(false, true);
+                                } catch (RepositoryException re) {
+                                    // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                                    process = false;
+                                }
+                                if ( process ) {
+                                    unlock = false;
+                                    this.processJob(info.event, eventNode);
+                                }
+                            }
+                        } catch (RepositoryException e) {
+                            // ignore
+                            this.ignoreException(e);
+                        } finally {
+                            if ( unlock && !parallelProcessing ) {
                                 synchronized ( this.processingMap ) {
-                                    final Boolean value = this.processingMap.get(jobTopic);
-                                    if ( value == null || !value.booleanValue() ) {
-                                        this.processingMap.put(jobTopic, Boolean.TRUE);
-                                        process = true;
-                                    }
+                                    this.processingMap.put(jobTopic, Boolean.FALSE);
                                 }
-
                             }
-                            if ( process ) {
-                                boolean unlock = true;
-                                try {
-                                    final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
-                                    if ( !eventNode.isLocked() ) {
-                                        // lock node
+                        }
+                    } else {
+                        try {
+                            // check if the node is in processing or already finished
+                            final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+                            if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                                final EventInfo eInfo = info;
+                                final Date fireDate = new Date();
+                                fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
+
+                                    // we put it back into the queue after a specific time
+                                final Runnable r = new Runnable() {
+
+                                    /**
+                                     * @see java.lang.Runnable#run()
+                                     */
+                                    public void run() {
                                         try {
-                                            eventNode.lock(false, true);
-                                        } catch (RepositoryException re) {
-                                            // lock failed which means that the node is locked by someone else, so we don't have to requeue
-                                            process = false;
-                                        }
-                                        if ( process ) {
-                                            unlock = false;
-                                            this.processJob(info.event, eventNode);
-                                        }
-                                    }
-                                } catch (RepositoryException e) {
-                                    // ignore
-                                    this.ignoreException(e);
-                                } finally {
-                                    if ( unlock && !parallelProcessing ) {
-                                        synchronized ( this.processingMap ) {
-                                            this.processingMap.put(jobTopic, Boolean.FALSE);
-                                        }
-                                    }
-                                }
-                            } else {
-                                try {
-                                    // check if the node is in processing or already finished
-                                    final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
-                                    if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
-                                        final EventInfo eInfo = info;
-                                        final Date fireDate = new Date();
-                                        fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);
-
-                                            // we put it back into the queue after a specific time
-                                        final Runnable r = new Runnable() {
-
-                                            /**
-                                             * @see java.lang.Runnable#run()
-                                             */
-                                            public void run() {
-                                                try {
-                                                    queue.put(eInfo);
-                                                } catch (InterruptedException e) {
-                                                    // ignore
-                                                    ignoreException(e);
-                                                }
+                                            if ( jobQueue != null ) {
+                                                jobQueue.put(eInfo);
+                                            } else {
+                                                queue.put(eInfo);
                                             }
-
-                                        };
-                                        try {
-                                            this.scheduler.fireJobAt(null, r, null, fireDate);
-                                        } catch (Exception e) {
-                                            // we ignore the exception
+                                        } catch (InterruptedException e) {
+                                            // ignore
                                             ignoreException(e);
-                                            // then wait for the time and readd the job
-                                            try {
-                                                Thread.sleep(sleepTime * 1000);
-                                            } catch (InterruptedException ie) {
-                                                // ignore
-                                                ignoreException(ie);
-                                            }
-                                            r.run();
                                         }
+                                    }
 
+                                };
+                                try {
+                                    this.scheduler.fireJobAt(null, r, null, fireDate);
+                                } catch (Exception e) {
+                                    // we ignore the exception
+                                    ignoreException(e);
+                                    // then wait for the time and readd the job
+                                    try {
+                                        Thread.sleep(sleepTime * 1000);
+                                    } catch (InterruptedException ie) {
+                                        // ignore
+                                        ignoreException(ie);
                                     }
-                                } catch (RepositoryException e) {
-                                    // ignore
-                                    this.ignoreException(e);
+                                    r.run();
                                 }
+
                             }
+                        } catch (RepositoryException e) {
+                            // ignore
+                            this.ignoreException(e);
                         }
-                    } catch (RepositoryException re) {
-                        this.ignoreException(re);
                     }
-
                 }
+            } catch (RepositoryException re) {
+                this.ignoreException(re);
             }
+
         }
     }
 
@@ -583,7 +668,8 @@
      * @param eventNode The node in the repository where the job is stored.
      */
     private void processJob(Event event, Node eventNode)  {
-        final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+        final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+                                           || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
         final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         boolean unlock = true;
         try {
@@ -808,7 +894,8 @@
                 job = new Event(job.getTopic(), newProperties);
             }
         }
-        final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+        final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null
+                                        || job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
         // we have to use the same session for unlocking that we used for locking!
         synchronized ( this.backgroundSession ) {
             try {
@@ -925,13 +1012,6 @@
     }
 
     /**
-     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#execute(java.lang.Runnable)
-     */
-    public void execute(Runnable job) {
-        this.threadPool.execute(job);
-    }
-
-    /**
      * Search for job nodes
      * @param topic The job topic
      * @param filterProps optional filter props