You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/15 19:55:53 UTC

svn commit: r1632141 [2/3] - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/impl/jobs/ src/main/java/org/apache/sling/event/impl/jobs/config/ src/main/java/org/apache/sling/event/impl/jobs/queues/ src/main/java/org/ap...

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Oct 15 17:55:52 2014
@@ -19,7 +19,6 @@
 package org.apache.sling.event.impl.jobs.queues;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -30,18 +29,16 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sling.commons.threads.ThreadPool;
-import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.EventingThreadPool;
 import org.apache.sling.event.impl.jobs.InternalJobState;
-import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
-import org.apache.sling.event.impl.jobs.stats.StatisticsImpl;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
@@ -52,7 +49,6 @@ import org.apache.sling.event.jobs.consu
 import org.apache.sling.event.jobs.consumer.JobExecutionResult;
 import org.apache.sling.event.jobs.consumer.JobExecutor;
 import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,97 +57,97 @@ import org.slf4j.LoggerFactory;
  * functionality for the job event handling.
  */
 public abstract class AbstractJobQueue
-    extends StatisticsImpl
-    implements JobStatusNotifier, Queue {
-
-    /** Default number of seconds to wait for an ack. */
-    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+    implements Queue, JobStatusNotifier {
 
     /** Default timeout for suspend. */
     private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
 
+    /** Default number of seconds to wait for an ack. */
+    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
     /** The logger. */
     protected final Logger logger;
 
     /** Configuration. */
     protected final InternalQueueConfiguration configuration;
 
-    /** The event admin. */
-    private final EventAdmin eventAdmin;
-
-    /** The job consumer manager. */
-    private final JobConsumerManager jobConsumerManager;
-
     /** The queue name. */
     protected volatile String queueName;
 
     /** Are we still running? */
     protected volatile boolean running;
 
-    /** Is the queue currently waiting(sleeping) */
-    protected volatile boolean isWaiting = false;
-
-    /** The map of events we're have started (send). */
-    private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>();
-
-    /** The map of events we're processing. */
-    private final Map<String, JobHandler> processsingJobsLists = new HashMap<String, JobHandler>();
-
     /** Suspended since. */
     private volatile long suspendedSince = -1L;
 
     /** Suspend lock. */
     private final Object suspendLock = new Object();
 
+    /** Services used by the queues. */
+    protected final QueueServices services;
+
+    /** The map of events we're processing. */
+    private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
+
+    private final ThreadPool threadPool;
+
+    /** The map of events we're have started (send). */
+    private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>();
+
     /** Async counter. */
     private final AtomicInteger asyncCounter = new AtomicInteger();
 
+    /** Is the queue currently waiting(sleeping) */
+    protected volatile boolean isWaiting = false;
+
     /** Flag for outdated. */
     private final AtomicBoolean isOutdated = new AtomicBoolean(false);
 
-    /** Marker flag if the queue is waiting for another element (= empty) */
-    protected boolean isWaitingForNext = false;
-
     /** A marker for closing the queue. */
     private final AtomicBoolean closeMarker = new AtomicBoolean(false);
 
-    private final ThreadPool threadPool;
-
     /**
-     * Start this queue
+     * Create a new queue
      * @param name The queue name
      * @param config The queue configuration
-     * @param environment The environment component
      */
     public AbstractJobQueue(final String name,
-                    final InternalQueueConfiguration config,
-                    final JobConsumerManager jobConsumerManager,
-                    final ThreadPoolManager threadPoolManager,
-                    final EventAdmin eventAdmin) {
+                            final InternalQueueConfiguration config,
+                            final QueueServices services) {
         if ( config.getOwnThreadPoolSize() > 0 ) {
-            this.threadPool = new EventingThreadPool(threadPoolManager, config.getOwnThreadPoolSize());
+            this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
         } else {
             this.threadPool = Environment.THREAD_POOL;
         }
         this.queueName = name;
         this.configuration = config;
-        this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+        this.services = services;
+        this.logger = new TestLogger(LoggerFactory.getLogger(this.getClass().getName() + '.' + name));
         this.running = true;
-        this.eventAdmin = eventAdmin;
-        this.jobConsumerManager = jobConsumerManager;
     }
 
     /**
-     * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+     * Return the queue configuration
      */
     @Override
-    public String getStateInfo() {
-        synchronized ( this.suspendLock ) {
-            return "isWaiting=" + this.isWaiting +
-                    ", suspendedSince=" + this.suspendedSince +
-                    ", isWaitingForNext=" + this.isWaitingForNext +
-                    ", asyncJobs=" + this.asyncCounter.get();
-        }
+    public InternalQueueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * Get the name of the job queue.
+     */
+    @Override
+    public String getName() {
+        return this.queueName;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStatistics()
+     */
+    @Override
+    public Statistics getStatistics() {
+        return this.services.statisticsManager.getQueueStatistics(this.queueName);
     }
 
     /**
@@ -180,37 +176,22 @@ public abstract class AbstractJobQueue
     }
 
     /**
-     * Return the queue configuration
+     * Is the queue outdated?
      */
-    @Override
-    public InternalQueueConfiguration getConfiguration() {
-        return this.configuration;
+    protected boolean isOutdated() {
+        return this.isOutdated.get();
     }
 
     /**
-     * Close this queue.
+     * Outdate this queue.
      */
-    public void close() {
-        this.running = false;
-        this.logger.debug("Shutting down job queue {}", queueName);
-        this.resume();
-        if ( this.isWaiting ) {
-            this.logger.debug("Waking up waiting queue {}", this.queueName);
-            this.notifyFinished(null);
-        }
-        // continue queue processing to stop the queue
-        this.put(new JobHandler(null, null));
-
-        synchronized ( this.processsingJobsLists ) {
-            this.processsingJobsLists.clear();
-        }
-        synchronized ( this.startedJobsLists ) {
-            this.startedJobsLists.clear();
-        }
-        if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
-            ((EventingThreadPool)this.threadPool).release();
+    public void outdate() {
+        if ( !this.isOutdated() ) {
+            this.isOutdated.set(true);
+            final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
+            this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
+            this.queueName = name;
         }
-        this.logger.info("Stopped job queue {}", this.queueName);
     }
 
     /**
@@ -234,7 +215,34 @@ public abstract class AbstractJobQueue
      * Check whether this queue can be closed
      */
     protected boolean canBeClosed() {
-        return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0 && this.isWaitingForNext;
+        return !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
+    }
+
+    /**
+     * Close this queue.
+     */
+    public void close() {
+        this.running = false;
+        this.logger.debug("Shutting down job queue {}", queueName);
+        this.resume();
+        if ( this.isWaiting ) {
+            this.logger.debug("Waking up waiting queue {}", this.queueName);
+            this.notifyFinished(false);
+        }
+        // continue queue processing to stop the queue
+        this.services.topicManager.stop(this.getName());
+
+        synchronized ( this.processingJobsLists ) {
+            this.processingJobsLists.clear();
+        }
+        synchronized ( this.startedJobsLists ) {
+            this.startedJobsLists.clear();
+        }
+        if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
+            ((EventingThreadPool)this.threadPool).release();
+        }
+
+        this.logger.info("Stopped job queue {}", this.queueName);
     }
 
     /**
@@ -275,12 +283,10 @@ public abstract class AbstractJobQueue
                     process = this.startedJobsLists.remove(info.getJob().getId()) != null;
                 }
                 if ( process ) {
-                    if ( !info.reschedule() ) {
-                        this.decQueued();
-                        checkForNotify(null);
-                    } else {
+                    if ( info.reschedule() ) {
                         this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(info.getJob()), info.getJob().getId());
-                        checkForNotify(info);
+                        this.reschedule(info);
+                        this.notifyFinished(true);
                     }
                 }
             }
@@ -288,30 +294,288 @@ public abstract class AbstractJobQueue
     }
 
     /**
-     * @see org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
+     * Execute the queue
      */
-    @Override
-    public boolean sendAcknowledge(final Event job) {
-        final String jobId = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID);
-        final JobHandler ack;
-        synchronized ( this.startedJobsLists ) {
-            ack = this.startedJobsLists.remove(jobId);
+    private void runJobQueue() {
+        while ( this.running ) {
+            JobHandler info = null;
+            if ( info == null ) {
+                // so let's wait/get the next job from the queue
+                info = this.take();
+            }
+
+            // if we're suspended we drop the current item
+            if ( this.running && info != null && !checkSuspended() ) {
+                // if we still have a job and are running, let's go
+                this.start(info);
+            }
         }
-        // if the event is still in the processing list, we confirm the ack
-        if ( ack != null ) {
+    }
+
+    private JobHandler take() {
+        return this.services.topicManager.take(this.getName());
+    }
+
+    /**
+     * Check if the queue is suspended and go into suspend mode
+     */
+    private boolean checkSuspended() {
+        boolean wasSuspended = false;
+        synchronized ( this.suspendLock ) {
+            while ( this.suspendedSince != -1 ) {
+                logger.debug("Sleeping as queue {} is suspended.", this.getName());
+                wasSuspended = true;
+                final long diff = System.currentTimeMillis() - this.suspendedSince;
+                try {
+                    this.suspendLock.wait(MAX_SUSPEND_TIME - diff);
+                } catch (final InterruptedException ignore) {
+                    this.ignoreException(ignore);
+                    Thread.currentThread().interrupt();
+                }
+                logger.debug("Waking up queue {}.", this.getName());
+                if ( System.currentTimeMillis() > this.suspendedSince + MAX_SUSPEND_TIME ) {
+                    this.resume();
+                }
+            }
+        }
+        return wasSuspended;
+    }
+
+    /**
+     * Execute a job
+     */
+    protected boolean executeJob(final JobHandler handler) {
+        final JobImpl job = handler.getJob();
+        final JobExecutor consumer = this.services.jobConsumerManager.getExecutor(job.getTopic());
+
+        if ( (consumer != null || (job.isBridgedEvent() && this.services.jobConsumerManager.supportsBridgedEvents())) ) {
+            final boolean success = this.startJobExecution(handler, consumer);
+            return success;
+        } else {
+            // no consumer on this instance, assign to another instance
+            handler.reassign();
+            return false;
+        }
+    }
+
+    private boolean startJobExecution(final JobHandler handler, final JobExecutor consumer) {
+        final JobImpl job = handler.getJob();
+        if ( handler.startProcessing(this) ) {
             if ( logger.isDebugEnabled() ) {
-                logger.debug("Received ack for job {}", Utility.toString(ack.getJob()));
+                logger.debug("Starting job {}", Utility.toString(job));
             }
-            final long queueTime = ack.started - ack.queued;
-            this.addActive(queueTime);
-            Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
-            synchronized ( this.processsingJobsLists ) {
-                this.processsingJobsLists.put(jobId, ack);
+            try {
+                handler.started = System.currentTimeMillis();
+
+                if ( consumer != null ) {
+                    final long queueTime = handler.started - handler.queued;
+                    Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
+                    synchronized ( this.processingJobsLists ) {
+                        this.processingJobsLists.put(job.getId(), handler);
+                    }
+
+                    final Runnable task = new Runnable() {
+
+                        /**
+                         * @see java.lang.Runnable#run()
+                         */
+                        @Override
+                        public void run() {
+                            final Object lock = new Object();
+                            final Thread currentThread = Thread.currentThread();
+                            // update priority and name
+                            final String oldName = currentThread.getName();
+                            final int oldPriority = currentThread.getPriority();
+
+                            currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
+                            if ( configuration.getThreadPriority() != null ) {
+                                switch ( configuration.getThreadPriority() ) {
+                                    case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+                                                break;
+                                    case MIN  : currentThread.setPriority(Thread.MIN_PRIORITY);
+                                                break;
+                                    case MAX  : currentThread.setPriority(Thread.MAX_PRIORITY);
+                                                break;
+                                }
+                            }
+                            JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
+                            Job.JobState resultState = Job.JobState.ERROR;
+                            final AtomicBoolean isAsync = new AtomicBoolean(false);
+
+                            try {
+                                synchronized ( lock ) {
+                                    final JobExecutionContext ctx = new JobExecutionContext() {
+
+                                        private boolean hasInit = false;
+
+                                        @Override
+                                        public void initProgress(final int steps,
+                                                final long eta) {
+                                            if ( !hasInit ) {
+                                                handler.persistJobProperties(job.startProgress(steps, eta));
+                                                hasInit = true;
+                                            }
+                                        }
+
+                                        @Override
+                                        public void incrementProgressCount(final int steps) {
+                                            if ( hasInit ) {
+                                                handler.persistJobProperties(job.setProgress(steps));
+                                            }
+                                        }
+
+                                        @Override
+                                        public void updateProgress(final long eta) {
+                                            if ( hasInit ) {
+                                                handler.persistJobProperties(job.update(eta));
+                                            }
+                                        }
+
+                                        @Override
+                                        public void log(final String message, Object... args) {
+                                            handler.persistJobProperties(job.log(message, args));
+                                        }
+
+                                        @Override
+                                        public boolean isStopped() {
+                                            return handler.isStopped();
+                                        }
+
+                                        @Override
+                                        public void asyncProcessingFinished(final JobExecutionResult result) {
+                                            synchronized ( lock ) {
+                                                if ( isAsync.compareAndSet(true, false) ) {
+                                                    services.jobConsumerManager.unregisterListener(job.getId());
+                                                    Job.JobState state = null;
+                                                    if ( result.succeeded() ) {
+                                                        state = Job.JobState.SUCCEEDED;
+                                                    } else if ( result.failed() ) {
+                                                        state = Job.JobState.QUEUED;
+                                                    } else if ( result.cancelled() ) {
+                                                        if ( handler.isStopped() ) {
+                                                            state = Job.JobState.STOPPED;
+                                                        } else {
+                                                            state = Job.JobState.ERROR;
+                                                        }
+                                                    }
+                                                    finishedJob(job.getId(), state, true);
+                                                    asyncCounter.decrementAndGet();
+                                                } else {
+                                                    throw new IllegalStateException("Job is not processed async " + job.getId());
+                                                }
+                                            }
+                                        }
+
+                                        @Override
+                                        public ResultBuilder result() {
+                                            return new ResultBuilder() {
+
+                                                private String message;
+
+                                                private Long retryDelayInMs;
+
+                                                @Override
+                                                public JobExecutionResult failed(final long retryDelayInMs) {
+                                                    this.retryDelayInMs = retryDelayInMs;
+                                                    return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+                                                }
+
+                                                @Override
+                                                public ResultBuilder message(final String message) {
+                                                    this.message = message;
+                                                    return this;
+                                                }
+
+                                                @Override
+                                                public JobExecutionResult succeeded() {
+                                                    return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs);
+                                                }
+
+                                                @Override
+                                                public JobExecutionResult failed() {
+                                                    return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+                                                }
+
+                                                @Override
+                                                public JobExecutionResult cancelled() {
+                                                    return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs);
+                                                }
+                                            };
+                                        }
+                                    };
+                                    result = (JobExecutionResultImpl)consumer.process(job, ctx);
+                                    if ( result == null ) { // ASYNC processing
+                                        services.jobConsumerManager.registerListener(job.getId(), consumer, ctx);
+                                        asyncCounter.incrementAndGet();
+                                        isAsync.set(true);
+                                    } else {
+                                        if ( result.succeeded() ) {
+                                            resultState = Job.JobState.SUCCEEDED;
+                                        } else if ( result.failed() ) {
+                                            resultState = Job.JobState.QUEUED;
+                                        } else if ( result.cancelled() ) {
+                                            if ( handler.isStopped() ) {
+                                                resultState = Job.JobState.STOPPED;
+                                            } else {
+                                                resultState = Job.JobState.ERROR;
+                                            }
+                                        }
+                                    }
+                                }
+                            } catch (final Throwable t) { //NOSONAR
+                                logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
+                                // we don't reschedule if an exception occurs
+                                result = JobExecutionResultImpl.CANCELLED;
+                                resultState = Job.JobState.ERROR;
+                            } finally {
+                                currentThread.setPriority(oldPriority);
+                                currentThread.setName(oldName);
+                                if ( result != null ) {
+                                    if ( result.getRetryDelayInMs() != null ) {
+                                        job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
+                                    }
+                                    if ( result.getMessage() != null ) {
+                                       job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
+                                    }
+                                    finishedJob(job.getId(), resultState, false);
+                                }
+                            }
+                        }
+
+                    };
+                    // check if the thread pool is available
+                    final ThreadPool pool = this.threadPool;
+                    if ( pool != null ) {
+                        pool.execute(task);
+                    } else {
+                        // if we don't have a thread pool, we create the thread directly
+                        // (this should never happen for jobs, but is a safe fall back)
+                        new Thread(task).start();
+                    }
+
+                } else {
+                    // let's add the event to our started jobs list
+                    synchronized ( this.startedJobsLists ) {
+                        this.startedJobsLists.put(job.getId(), handler);
+                    }
+                    final Event jobEvent = this.getJobEvent(handler);
+                    // we need async delivery, otherwise we might create a deadlock
+                    // as this method runs inside a synchronized block and the finishedJob
+                    // method as well!
+                    this.services.eventAdmin.postEvent(jobEvent);
+                }
+                return true;
+
+            } catch (final Exception re) {
+                // if an exception occurs, we just log
+                this.logger.error("Exception during job processing.", re);
             }
         } else {
-            this.decQueued();
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Discarding removed job {}", Utility.toString(job));
+            }
         }
-        return ack != null;
+        return false;
     }
 
     private static final class RescheduleInfo {
@@ -319,42 +583,41 @@ public abstract class AbstractJobQueue
         public long    processingTime;
     }
 
-    private RescheduleInfo handleReschedule(final JobHandler jobEvent, final Job.JobState resultState) {
+    private RescheduleInfo handleReschedule(final JobHandler handler, final Job.JobState resultState) {
         final RescheduleInfo info = new RescheduleInfo();
         switch ( resultState ) {
             case SUCCEEDED : // job is finished
                 if ( this.logger.isDebugEnabled() ) {
-                    this.logger.debug("Finished job {}", Utility.toString(jobEvent.getJob()));
+                    this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
                 }
-                info.processingTime = System.currentTimeMillis() - jobEvent.started;
-                this.finishedJob(info.processingTime);
+                info.processingTime = System.currentTimeMillis() - handler.started;
+                Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
                 break;
             case QUEUED : // check if we exceeded the number of retries
-                int retries = (Integer) jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
-                int retryCount = (Integer)jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+                final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
+                int retryCount = (Integer)handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
 
                 retryCount++;
                 if ( retries != -1 && retryCount > retries ) {
                     if ( this.logger.isDebugEnabled() ) {
-                        this.logger.debug("Cancelled job {}", Utility.toString(jobEvent.getJob()));
+                        this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
                     }
-                    this.cancelledJob();
+                    Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
                 } else {
                     info.reschedule = true;
-                    // update event with retry count and retries
-                    jobEvent.getJob().retry();
+                    this.reschedule(handler);
                     if ( this.logger.isDebugEnabled() ) {
-                        this.logger.debug("Failed job {}", Utility.toString(jobEvent.getJob()));
+                        this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
                     }
-                    this.failedJob();
-                    jobEvent.queued = System.currentTimeMillis();
+                    handler.queued = System.currentTimeMillis();
+                    Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
                 }
                 break;
             default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
                 if ( this.logger.isDebugEnabled() ) {
-                    this.logger.debug("Cancelled job {}", Utility.toString(jobEvent.getJob()));
+                    this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
                 }
-                this.cancelledJob();
+                Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
                 break;
         }
 
@@ -379,7 +642,6 @@ public abstract class AbstractJobQueue
         if ( this.logger.isDebugEnabled() ) {
             this.logger.debug("Received finish for job {}, resultState={}", jobId, resultState);
         }
-        // let's remove the event from our processing list
         // this is just a sanity check, as usually the job should have been
         // removed during sendAcknowledge.
         synchronized ( this.startedJobsLists ) {
@@ -388,8 +650,9 @@ public abstract class AbstractJobQueue
 
         // get job handler
         final JobHandler handler;
-        synchronized ( this.processsingJobsLists ) {
-            handler = this.processsingJobsLists.remove(jobId);
+        // let's remove the event from our processing list
+        synchronized ( this.processingJobsLists ) {
+            handler = this.processingJobsLists.remove(jobId);
         }
 
         if ( !this.running ) {
@@ -409,338 +672,15 @@ public abstract class AbstractJobQueue
         if ( resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule ) {
             resultState = Job.JobState.GIVEN_UP;
         }
-        // if this is set after the synchronized block we have an error
-        final boolean finishSuccessful;
 
         if ( !rescheduleInfo.reschedule ) {
             // we keep cancelled jobs and succeeded jobs if the queue is configured like this.
             final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
             handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
-            finishSuccessful = true;
-            if ( resultState == Job.JobState.SUCCEEDED ) {
-                Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
-            } else {
-                Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
-            }
-        } else {
-            finishSuccessful = handler.reschedule();
-            Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
-        }
-
-        if ( !isAsync ) {
-            if ( !finishSuccessful || !rescheduleInfo.reschedule ) {
-                checkForNotify(null);
-                return false;
-            }
-            checkForNotify(handler);
-        } else {
-            // async result
-            if ( finishSuccessful && rescheduleInfo.reschedule ) {
-                final JobHandler reprocessHandler = this.reschedule(handler);
-                if ( reprocessHandler != null ) {
-                    this.put(reprocessHandler);
-                }
-            }
-        }
-        return true;
-    }
-
-    private void checkForNotify(final JobHandler info) {
-        JobHandler reprocessInfo = null;
-        if ( info != null ) {
-            reprocessInfo = this.reschedule(info);
-        }
-        notifyFinished(reprocessInfo);
-    }
-
-    /**
-     * Get the name of the job queue.
-     */
-    @Override
-    public String getName() {
-        return this.queueName;
-    }
-
-
-    /**
-     * Add a new job to the queue.
-     */
-    public void process(final JobHandler handler) {
-        this.closeMarker.set(false);
-        handler.queued = System.currentTimeMillis();
-        this.incQueued();
-        this.put(handler);
-    }
-
-    /**
-     * Check if the queue is suspended and go into suspend mode
-     */
-    private void checkSuspended() {
-        synchronized ( this.suspendLock ) {
-            while ( this.suspendedSince != -1 ) {
-                try {
-                    this.suspendLock.wait(MAX_SUSPEND_TIME);
-                } catch (final InterruptedException ignore) {
-                    this.ignoreException(ignore);
-                    Thread.currentThread().interrupt();
-                }
-                if ( System.currentTimeMillis() > this.suspendedSince + MAX_SUSPEND_TIME ) {
-                    this.resume();
-                }
-            }
         }
-    }
-
-    /**
-     * Execute the queue
-     */
-    private void runJobQueue() {
-        JobHandler info = null;
-        while ( this.running ) {
-            if ( info == null ) {
-                // so let's wait/get the next job from the queue
-                info = this.take();
-            }
+        this.notifyFinished(rescheduleInfo.reschedule);
 
-            if ( this.running ) {
-                checkSuspended();
-            }
-            if ( info != null && this.running ) {
-                info = this.start(info);
-            }
-        }
-    }
-
-    /**
-     * Execute a job
-     */
-    protected boolean executeJob(final JobHandler handler) {
-        final JobImpl job = handler.getJob();
-        final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic());
-
-        if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
-            if ( handler.startProcessing(this) ) {
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug("Starting job {}", Utility.toString(job));
-                }
-                try {
-                    handler.started = System.currentTimeMillis();
-
-                    if ( consumer != null ) {
-                        final long queueTime = handler.started - handler.queued;
-                        this.addActive(queueTime);
-                        Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
-                        synchronized ( this.processsingJobsLists ) {
-                            this.processsingJobsLists.put(job.getId(), handler);
-                        }
-
-                        final Runnable task = new Runnable() {
-
-                            /**
-                             * @see java.lang.Runnable#run()
-                             */
-                            @Override
-                            public void run() {
-                                final Object lock = new Object();
-                                final Thread currentThread = Thread.currentThread();
-                                // update priority and name
-                                final String oldName = currentThread.getName();
-                                final int oldPriority = currentThread.getPriority();
-
-                                currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
-                                if ( configuration.getThreadPriority() != null ) {
-                                    switch ( configuration.getThreadPriority() ) {
-                                        case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
-                                                    break;
-                                        case MIN  : currentThread.setPriority(Thread.MIN_PRIORITY);
-                                                    break;
-                                        case MAX  : currentThread.setPriority(Thread.MAX_PRIORITY);
-                                                    break;
-                                    }
-                                }
-                                JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
-                                Job.JobState resultState = Job.JobState.ERROR;
-                                final AtomicBoolean isAsync = new AtomicBoolean(false);
-
-                                try {
-                                    synchronized ( lock ) {
-                                        final JobExecutionContext ctx = new JobExecutionContext() {
-
-                                            private boolean hasInit = false;
-
-                                            @Override
-                                            public void initProgress(final int steps,
-                                                    final long eta) {
-                                                if ( !hasInit ) {
-                                                    handler.persistJobProperties(job.startProgress(steps, eta));
-                                                    hasInit = true;
-                                                }
-                                            }
-
-                                            @Override
-                                            public void incrementProgressCount(final int steps) {
-                                                if ( hasInit ) {
-                                                    handler.persistJobProperties(job.setProgress(steps));
-                                                }
-                                            }
-
-                                            @Override
-                                            public void updateProgress(final long eta) {
-                                                if ( hasInit ) {
-                                                    handler.persistJobProperties(job.update(eta));
-                                                }
-                                            }
-
-                                            @Override
-                                            public void log(final String message, Object... args) {
-                                                handler.persistJobProperties(job.log(message, args));
-                                            }
-
-                                            @Override
-                                            public boolean isStopped() {
-                                                return handler.isStopped();
-                                            }
-
-                                            @Override
-                                            public void asyncProcessingFinished(final JobExecutionResult result) {
-                                                synchronized ( lock ) {
-                                                    if ( isAsync.compareAndSet(true, false) ) {
-                                                        jobConsumerManager.unregisterListener(job.getId());
-                                                        Job.JobState state = null;
-                                                        if ( result.succeeded() ) {
-                                                            state = Job.JobState.SUCCEEDED;
-                                                        } else if ( result.failed() ) {
-                                                            state = Job.JobState.QUEUED;
-                                                        } else if ( result.cancelled() ) {
-                                                            if ( handler.isStopped() ) {
-                                                                state = Job.JobState.STOPPED;
-                                                            } else {
-                                                                state = Job.JobState.ERROR;
-                                                            }
-                                                        }
-                                                        finishedJob(job.getId(), state, true);
-                                                        asyncCounter.decrementAndGet();
-                                                    } else {
-                                                        throw new IllegalStateException("Job is not processed async " + job.getId());
-                                                    }
-                                                }
-                                            }
-
-                                            @Override
-                                            public ResultBuilder result() {
-                                                return new ResultBuilder() {
-
-                                                    private String message;
-
-                                                    private Long retryDelayInMs;
-
-                                                    @Override
-                                                    public JobExecutionResult failed(final long retryDelayInMs) {
-                                                        this.retryDelayInMs = retryDelayInMs;
-                                                        return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
-                                                    }
-
-                                                    @Override
-                                                    public ResultBuilder message(final String message) {
-                                                        this.message = message;
-                                                        return this;
-                                                    }
-
-                                                    @Override
-                                                    public JobExecutionResult succeeded() {
-                                                        return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs);
-                                                    }
-
-                                                    @Override
-                                                    public JobExecutionResult failed() {
-                                                        return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
-                                                    }
-
-                                                    @Override
-                                                    public JobExecutionResult cancelled() {
-                                                        return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs);
-                                                    }
-                                                };
-                                            }
-                                        };
-                                        result = (JobExecutionResultImpl)consumer.process(job, ctx);
-                                        if ( result == null ) { // ASYNC processing
-                                            jobConsumerManager.registerListener(job.getId(), consumer, ctx);
-                                            asyncCounter.incrementAndGet();
-                                            notifyFinished(null);
-                                            isAsync.set(true);
-                                        } else {
-                                            if ( result.succeeded() ) {
-                                                resultState = Job.JobState.SUCCEEDED;
-                                            } else if ( result.failed() ) {
-                                                resultState = Job.JobState.QUEUED;
-                                            } else if ( result.cancelled() ) {
-                                                if ( handler.isStopped() ) {
-                                                    resultState = Job.JobState.STOPPED;
-                                                } else {
-                                                    resultState = Job.JobState.ERROR;
-                                                }
-                                            }
-                                        }
-                                    }
-                                } catch (final Throwable t) { //NOSONAR
-                                    logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
-                                    // we don't reschedule if an exception occurs
-                                    result = JobExecutionResultImpl.CANCELLED;
-                                    resultState = Job.JobState.ERROR;
-                                } finally {
-                                    currentThread.setPriority(oldPriority);
-                                    currentThread.setName(oldName);
-                                    if ( result != null ) {
-                                        if ( result.getRetryDelayInMs() != null ) {
-                                            job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
-                                        }
-                                        if ( result.getMessage() != null ) {
-                                           job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
-                                        }
-                                        finishedJob(job.getId(), resultState, false);
-                                    }
-                                }
-                            }
-
-                        };
-                        // check if the thread pool is available
-                        final ThreadPool pool = this.threadPool;
-                        if ( pool != null ) {
-                            pool.execute(task);
-                        } else {
-                            // if we don't have a thread pool, we create the thread directly
-                            // (this should never happen for jobs, but is a safe fallback)
-                            new Thread(task).start();
-                        }
-
-                    } else {
-                        // let's add the event to our processing list
-                        synchronized ( this.startedJobsLists ) {
-                            this.startedJobsLists.put(job.getId(), handler);
-                        }
-                        final Event jobEvent = this.getJobEvent(handler);
-                        // we need async delivery, otherwise we might create a deadlock
-                        // as this method runs inside a synchronized block and the finishedJob
-                        // method as well!
-                        this.eventAdmin.postEvent(jobEvent);
-                    }
-                    return true;
-
-                } catch (final Exception re) {
-                    // if an exception occurs, we just log
-                    this.logger.error("Exception during job processing.", re);
-                }
-            } else {
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug("Discarding removed job {}", Utility.toString(job));
-                }
-            }
-        } else {
-            handler.reassign();
-        }
-        this.decQueued();
-        return false;
+        return rescheduleInfo.reschedule;
     }
 
     /**
@@ -768,40 +708,27 @@ public abstract class AbstractJobQueue
     }
 
     /**
-     * Helper method which just logs the exception in debug mode.
-     * @param e
+     * @see org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
      */
-    protected void ignoreException(Exception e) {
-        if ( this.logger.isDebugEnabled() ) {
-            this.logger.debug("Ignored exception " + e.getMessage(), e);
+    @Override
+    public boolean sendAcknowledge(final Event job) {
+        final String jobId = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID);
+        final JobHandler ack;
+        synchronized ( this.startedJobsLists ) {
+            ack = this.startedJobsLists.remove(jobId);
         }
-    }
-
-    /**
-     * Is the queue outdated?
-     */
-    protected boolean isOutdated() {
-        return this.isOutdated.get();
-    }
-
-    /**
-     * Outdate this queue.
-     */
-    public void outdate() {
-        if ( !this.isOutdated() ) {
-            this.isOutdated.set(true);
-            final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
-            this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
-            this.queueName = name;
+        // if the event is still in the started jobs list, we confirm the ack
+        if ( ack != null ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Received ack for job {}", Utility.toString(ack.getJob()));
+            }
+            final long queueTime = ack.started - ack.queued;
+            Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
+            synchronized ( this.processingJobsLists ) {
+                this.processingJobsLists.put(jobId, ack);
+            }
         }
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Queue#getStatistics()
-     */
-    @Override
-    public Statistics getStatistics() {
-        return this;
+        return ack != null;
     }
 
     /**
@@ -841,37 +768,12 @@ public abstract class AbstractJobQueue
         }
     }
 
-
     /**
      * @see org.apache.sling.event.jobs.Queue#removeAll()
      */
     @Override
     public synchronized void removeAll() {
-        // we suspend the queue
-        final boolean wasSuspended = this.isSuspended();
-        this.suspend();
-        // we copy all events and remove them in the background
-        final Collection<JobHandler> events = this.removeAllJobs();
-        this.clearQueued();
-        final Thread t = new Thread(new Runnable() {
-
-                /**
-                 * @see java.lang.Runnable#run()
-                 */
-                @Override
-                public void run() {
-                    for(final JobHandler job : events) {
-                        job.cancel();
-                        Utility.sendNotification(eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, job.getJob(), null);
-                    }
-                }
-            }, "Apache Sling Queue RemoveAll Thread for " + this.queueName);
-        t.setDaemon(true);
-        t.start();
-        // start queue again
-        if ( !wasSuspended ) {
-            this.resume();
-        }
+        this.services.topicManager.removeAll(this.getName());
     }
 
     /**
@@ -879,7 +781,7 @@ public abstract class AbstractJobQueue
      */
     @Override
     public void clear() {
-        this.clearQueued();
+        // this is a noop
     }
 
     /**
@@ -891,6 +793,18 @@ public abstract class AbstractJobQueue
         return null;
     }
 
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+     */
+    @Override
+    public String getStateInfo() {
+        synchronized ( this.suspendLock ) {
+            return "isWaiting=" + this.isWaiting +
+                    ", suspendedSince=" + this.suspendedSince +
+                    ", asyncJobs=" + this.asyncCounter.get();
+        }
+    }
+
     protected long getRetryDelay(final JobHandler handler) {
         long delay = this.configuration.getRetryDelayInMs();
         if ( handler.getJob().getProperty(JobImpl.PROPERTY_DELAY_OVERRIDE) != null ) {
@@ -901,44 +815,38 @@ public abstract class AbstractJobQueue
         return delay;
     }
 
-    /**
-     * Reschedule a job.
-     */
-    protected abstract JobHandler reschedule(final JobHandler info);
-
-    /**
-     * Put another job into the queue.
-     */
-    protected abstract void put(final JobHandler event);
-
-    /**
-     * Get another job from the queue.
-     */
-    protected abstract JobHandler take();
-
-    /**
-     * Is the queue empty?
-     */
-    protected abstract boolean isEmpty();
+    protected void reschedule(final JobHandler handler) {
+        // update event with retry count and retries
+        handler.reschedule();
+    }
 
     /**
-     * Remove all events from the queue and return them.
+     * Helper method which just logs the exception in debug mode.
+     * @param e
      */
-    protected abstract Collection<JobHandler> removeAllJobs();
-
-    protected abstract JobHandler start(final JobHandler event);
-
-    protected abstract void notifyFinished(final JobHandler rescheduleInfo);
+    protected void ignoreException(Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
 
     public boolean stopJob(final JobImpl job) {
         final JobHandler handler;
-        synchronized ( this.processsingJobsLists ) {
-            handler = this.processsingJobsLists.get(job.getId());
+        synchronized ( this.processingJobsLists ) {
+            handler = this.processingJobsLists.get(job.getId());
         }
         if ( handler != null ) {
             handler.stop();
         }
         return handler != null;
     }
+
+    /**
+     * Start processing of a new job.
+     * @param handler The new job handler
+     */
+    protected abstract void start(final JobHandler handler);
+
+    protected abstract void notifyFinished(boolean reschedule);
 }
 

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Oct 15 17:55:52 2014
@@ -18,19 +18,8 @@
  */
 package org.apache.sling.event.impl.jobs.queues;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.osgi.service.event.EventAdmin;
 
 /**
  * An ordered job queue is processing the queue FIFO in a serialized
@@ -40,54 +29,20 @@ import org.osgi.service.event.EventAdmin
  */
 public final class OrderedJobQueue extends AbstractJobQueue {
 
-    /** The job handler for rescheduling. */
-    private volatile JobHandler jobHandler;
-
-    /** Lock and status object for handling the sleep phase. */
-    private final SleepLock sleepLock = new SleepLock();
-
-    /** The queue - we use a set which is sorted by job creation date. */
-    private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>() {
-
-        @Override
-        public int compare(final JobHandler o1, final JobHandler o2) {
-            if ( o1.getJob() == null ) {
-                if ( o2.getJob() == null ) {
-                    return 0;
-                }
-                return -1;
-            }
-            if ( o2.getJob() == null ) {
-                return 1;
-            }
-            int result = o1.getJob().getCreated().compareTo(o2.getJob().getCreated());
-            if (result == 0 ) {
-                result = o1.getJob().getId().compareTo(o2.getJob().getId());
-            }
-            return result;
-        }
-    });
-
+    /** Object to sync operations within this queue. */
     private final Object syncLock = new Object();
 
+    /** Sleep delay if job needs rescheduling. */
+    private long sleepDelay = -1;
+
     public OrderedJobQueue(final String name,
                            final InternalQueueConfiguration config,
-                           final JobConsumerManager jobConsumerManager,
-                           final ThreadPoolManager threadPoolManager,
-                           final EventAdmin eventAdmin) {
-        super(name, config, jobConsumerManager, threadPoolManager, eventAdmin);
-    }
-
-    @Override
-    public String getStateInfo() {
-        return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince;
+                           final QueueServices services) {
+        super(name, config, services);
     }
 
     @Override
-    protected JobHandler start(final JobHandler handler) {
-        JobHandler rescheduleHandler = null;
-
-        // if we are ordered we simply wait for the finish
+    protected void start(final JobHandler handler) {
         synchronized ( this.syncLock ) {
             if ( this.executeJob(handler) ) {
                 this.isWaiting = true;
@@ -100,151 +55,45 @@ public final class OrderedJobQueue exten
                         Thread.currentThread().interrupt();
                     }
                 }
-                this.logger.debug("Job queue {} is continuing.", this.queueName);
-                rescheduleHandler = this.jobHandler;
-                this.jobHandler = null;
-            }
-        }
-        return rescheduleHandler;
-    }
-
-    private void wakeUp(final boolean discardJob) {
-        synchronized ( this.sleepLock ) {
-            if ( this.sleepLock.sleepingSince != -1 ) {
-                if ( discardJob ) {
-                    this.sleepLock.jobHandler = null;
-                }
-                this.sleepLock.notify();
-            }
-        }
-    }
-
-    @Override
-    public void resume() {
-        this.wakeUp(false);
-        super.resume();
-    }
-
-    @Override
-    protected void put(final JobHandler handler) {
-        synchronized ( this.queue ) {
-            this.queue.add(handler);
-            this.queue.notify();
-            this.isWaitingForNext = false;
-        }
-    }
-
-    @Override
-    protected JobHandler take() {
-        synchronized ( this.queue ) {
-            while ( this.queue.isEmpty() ) {
-                this.isWaitingForNext = true;
-                try {
-                    this.queue.wait();
-                } catch (final InterruptedException e) {
-                    this.ignoreException(e);
-                    Thread.currentThread().interrupt();
+                if ( this.sleepDelay > 0 ) {
+                    final long waitingTime = this.sleepDelay;
+                    this.sleepDelay = -1;
+                    final long startTime = System.currentTimeMillis();
+                    this.logger.debug("Job queue {} is sleeping {}ms for retry.", this.queueName, waitingTime);
+                    this.isWaiting = true;
+                    while ( this.isWaiting ) {
+                        try {
+                            this.syncLock.wait(waitingTime);
+                            if ( System.currentTimeMillis() >= startTime + waitingTime ) {
+                                this.isWaiting = false;
+                            }
+                        } catch (final InterruptedException e) {
+                            this.ignoreException(e);
+                            Thread.currentThread().interrupt();
+                        }
+                    }
                 }
-                this.isWaitingForNext = false;
+                this.logger.debug("Job queue {} is continuing.", this.queueName);
             }
-            // get the first element and remove it
-            final Iterator<JobHandler> i = this.queue.iterator();
-            final JobHandler result = i.next();
-            i.remove();
-            return result;
         }
     }
 
     @Override
-    protected boolean isEmpty() {
-        synchronized ( this.queue ) {
-            return this.queue.isEmpty();
-        }
+    protected void reschedule(final JobHandler handler) {
+        super.reschedule(handler);
+        this.sleepDelay = this.getRetryDelay(handler);
     }
 
     @Override
-    protected void notifyFinished(final JobHandler rescheduleHandler) {
-        this.jobHandler = rescheduleHandler;
+    protected void notifyFinished(final boolean reschedule) {
         this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
         synchronized ( this.syncLock ) {
             this.isWaiting = false;
-            this.syncLock.notify();
-        }
-    }
-
-    @Override
-    protected JobHandler reschedule(final JobHandler handler) {
-        // we just sleep for the delay time - if none, we continue and retry
-        // this job again
-        final long delay = this.getRetryDelay(handler);
-        if ( delay > 0 ) {
-            synchronized ( this.sleepLock ) {
-                this.sleepLock.sleepingSince = System.currentTimeMillis();
-                this.sleepLock.jobHandler = handler;
-                this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
-                try {
-                    this.sleepLock.wait(delay);
-                } catch (final InterruptedException e) {
-                    this.ignoreException(e);
-                    Thread.currentThread().interrupt();
-                }
-                this.sleepLock.sleepingSince = -1;
-                final JobHandler result = this.sleepLock.jobHandler;
-                this.sleepLock.jobHandler = null;
-
-                if ( result == null ) {
-                    handler.cancel();
-                }
-                return result;
+            if ( !reschedule ) {
+                this.sleepDelay = -1;
             }
+            this.syncLock.notify();
         }
-        return handler;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.Queue#clear()
-     */
-    @Override
-    public void clear() {
-        synchronized ( this.queue ) {
-            this.queue.clear();
-        }
-        super.clear();
-    }
-
-    @Override
-    public synchronized void removeAll() {
-        // remove all remaining jobs first
-        super.removeAll();
-        this.jobHandler = null;
-        this.wakeUp(true);
-    }
-
-    @Override
-    protected Collection<JobHandler> removeAllJobs() {
-        final List<JobHandler> events = new ArrayList<JobHandler>();
-        synchronized ( this.queue ) {
-            events.addAll(this.queue);
-            this.queue.clear();
-        }
-        return events;
-    }
-
-    @Override
-    public Object getState(final String key) {
-        if ( "isSleepingUntil".equals(key) ) {
-            return this.sleepLock.sleepingSince;
-        }
-        return super.getState(key);
-    }
-
-    private static final class SleepLock {
-
-        /** Marker indicating that this queue is currently sleeping. */
-        public volatile long sleepingSince = -1;
-
-        /** The job event to be returned after sleeping. */
-        public volatile JobHandler jobHandler;
     }
 }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Wed Oct 15 17:55:52 2014
@@ -18,81 +18,119 @@
  */
 package org.apache.sling.event.impl.jobs.queues;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.sling.commons.scheduler.Scheduler;
-import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import java.util.Date;
+
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.osgi.service.event.EventAdmin;
 
 /**
  * The default parallel job queue processing the entries FIFO.
  * Failing jobs are rescheduled and put at the end of the queue.
  */
-public final class ParallelJobQueue extends AbstractParallelJobQueue {
+public final class ParallelJobQueue extends AbstractJobQueue {
+
+    private volatile int jobCount;
 
-    /** The queue. */
-    private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>();
+    private final Object syncLock = new Object();
 
     public ParallelJobQueue(final String name,
                            final InternalQueueConfiguration config,
-                           final JobConsumerManager jobConsumerManager,
-                           final ThreadPoolManager threadPoolManager,
-                           final EventAdmin eventAdmin,
-                           final Scheduler scheduler) {
-        super(name, config, jobConsumerManager, threadPoolManager, eventAdmin, scheduler);
+                           final QueueServices services) {
+        super(name, config, services);
     }
 
     @Override
-    protected void put(final JobHandler event) {
-        try {
-            this.isWaitingForNext = false;
-            this.queue.put(event);
-        } catch (final InterruptedException e) {
-            this.ignoreException(e);
-            Thread.currentThread().interrupt();
-        }
+    public String getStateInfo() {
+        return super.getStateInfo() + ", jobCount=" + this.jobCount;
     }
 
     @Override
-    protected JobHandler take() {
-        try {
-            this.isWaitingForNext = true;
-            return this.queue.take();
-        } catch (final InterruptedException e) {
-            this.ignoreException(e);
-            Thread.currentThread().interrupt();
-        } finally {
-            this.isWaitingForNext = false;
+    protected void start(final JobHandler processInfo) {
+        // acquire a slot
+        this.acquireSlot();
+
+        // check if we got outdated in the meantime
+        if ( this.isOutdated() ) {
+            this.freeSlot();
+            return;
+        }
+        if ( !this.executeJob(processInfo) ) {
+            this.freeSlot();
         }
-        return null;
     }
 
-    @Override
-    protected boolean isEmpty() {
-        return this.queue.isEmpty();
+    /**
+     * Acquire a processing slot.
+     * This method is called if the queue is not ordered.
+     */
+    private void acquireSlot() {
+        synchronized ( this.syncLock ) {
+            if ( jobCount >= this.configuration.getMaxParallel() ) {
+                this.isWaiting = true;
+                this.logger.debug("Job queue {} is processing {} jobs - waiting for a free slot.", this.queueName, jobCount);
+                while ( this.isWaiting ) {
+                    try {
+                        this.syncLock.wait();
+                    } catch (final InterruptedException e) {
+                        this.ignoreException(e);
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                this.logger.debug("Job queue {} is continuing.", this.queueName);
+            }
+            jobCount++;
+        }
     }
 
     /**
-     * @see org.apache.sling.event.jobs.Queue#clear()
+     * Free a slot when a job processing is finished.
      */
+    private void freeSlot() {
+        synchronized ( this.syncLock ) {
+            jobCount--;
+            if ( this.isWaiting ) {
+                this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+                this.isWaiting = false;
+                this.syncLock.notify();
+            }
+        }
+    }
+
+    @Override
+    protected boolean canBeClosed() {
+        boolean result = super.canBeClosed();
+        if ( result ) {
+            result = this.jobCount == 0;
+        }
+        return result;
+    }
+
     @Override
-    public void clear() {
-        this.queue.clear();
-        super.clear();
+    protected void notifyFinished(final boolean reschedule) {
+        this.freeSlot();
     }
 
     @Override
-    protected Collection<JobHandler> removeAllJobs() {
-        final List<JobHandler> events = new ArrayList<JobHandler>();
-        this.queue.drainTo(events);
-        return events;
+    protected void reschedule(final JobHandler handler) {
+        // we just sleep for the delay time - if none, we continue and retry
+        // this job again
+        final long delay = this.getRetryDelay(handler);
+        if ( delay > 0 ) {
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
+            final Runnable t = new Runnable() {
+                @Override
+                public void run() {
+                    ParallelJobQueue.super.reschedule(handler);
+                }
+            };
+            services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName));
+        } else {
+            // put directly into queue
+            super.reschedule(handler);
+        }
     }
 }
 

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632141&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Wed Oct 15 17:55:52 2014
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.TestLogger;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
+import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.apache.sling.event.impl.jobs.topics.TopicManager;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.jmx.QueuesMBean;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of the job manager.
+ */
+@Component(immediate=true)
+@Service(value={Runnable.class, QueueManager.class})
+@Properties({
+    @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
+    @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
+})
+public class QueueManager
+    implements Runnable {
+
+    /** Default logger. */
+    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    @Reference
+    private Scheduler scheduler;
+
+    @Reference
+    private JobConsumerManager jobConsumerManager;
+
+    @Reference
+    private QueuesMBean queuesMBean;
+
+    @Reference
+    private ThreadPoolManager threadPoolManager;
+
+    /** The job manager configuration. */
+    @Reference
+    private JobManagerConfiguration configuration;
+
+    @Reference
+    private StatisticsManager statisticsManager;
+
+    @Reference
+    private QueueConfigurationManager queueManager;
+
+    /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
+    private final Object queuesLock = new Object();
+
+    /** All active queues. */
+    private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+
+    /** We count the scheduler runs. */
+    private volatile long schedulerRuns;
+
+    /**
+     * Activate this component.
+     * @param props Configuration properties
+     */
+    @Activate
+    protected void activate(final Map<String, Object> props) {
+        logger.info("Apache Sling Queue Manager started on instance {}", Environment.APPLICATION_ID);
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    @Deactivate
+    protected void deactivate() {
+        logger.debug("Apache Sling Queue Manager stopping on instance {}", Environment.APPLICATION_ID);
+
+        final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
+        while ( i.hasNext() ) {
+            final AbstractJobQueue jbq = i.next();
+            jbq.close();
+            // update mbeans
+            ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
+        }
+        this.queues.clear();
+        logger.info("Apache Sling Queue Manager stopped on instance {}", Environment.APPLICATION_ID);
+    }
+
+    /**
+     * This method is invoked periodically by the scheduler.
+     * It searches for idle queues and stops them after a timeout. If a queue
+     * is idle for two consecutive clean up calls, it is removed.
+     * @see java.lang.Runnable#run()
+     */
+    private void maintain() {
+        this.schedulerRuns++;
+        logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns);
+
+        // check for unprocessed jobs first
+        logger.debug("Checking for unprocessed jobs...");
+        for(final AbstractJobQueue jbq : this.queues.values() ) {
+            jbq.checkForUnprocessedJobs();
+        }
+
+        // we only do a full clean up on every fifth run
+        final boolean doFullCleanUp = (schedulerRuns % 5 == 0);
+
+        if ( doFullCleanUp ) {
+            // check for idle queue
+            logger.debug("Checking for idle queues...");
+
+           // we synchronize to avoid creating a queue which is about to be removed during cleanup
+            synchronized ( queuesLock ) {
+                final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
+                while ( i.hasNext() ) {
+                    final Map.Entry<String, AbstractJobQueue> current = i.next();
+                    final AbstractJobQueue jbq = current.getValue();
+                    if ( jbq.tryToClose() ) {
+                        logger.debug("Removing idle job queue {}", jbq);
+                        // remove
+                        i.remove();
+                        // update mbeans
+                        ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
+                    }
+                }
+            }
+        }
+        logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
+    }
+
+    /**
+     * Start a new queue
+     * This method first searches the corresponding queue - if such a queue
+     * does not exist yet, it is created and started.
+     *
+     * @param topic The topic
+     */
+    public void start(final TopicManager topicManager, final QueueInfo queueInfo) {
+        final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+        // get or create queue
+        AbstractJobQueue queue = null;
+        // we synchronize to avoid creating a queue which is about to be removed during cleanup
+        synchronized ( queuesLock ) {
+            queue = this.queues.get(queueInfo.queueName);
+            // check for reconfiguration, we really do an identity check here(!)
+            if ( queue != null && queue.getConfiguration() != config ) {
+                this.outdateQueue(queue);
+                // we use a new queue with the configuration
+                queue = null;
+            }
+            if ( queue == null ) {
+                final QueueServices services = new QueueServices();
+                services.eventAdmin = this.eventAdmin;
+                services.jobConsumerManager = this.jobConsumerManager;
+                services.scheduler = this.scheduler;
+                services.threadPoolManager = this.threadPoolManager;
+                services.topicManager = topicManager;
+                services.statisticsManager = statisticsManager;
+                if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
+                    queue = new OrderedJobQueue(queueInfo.queueName, config, services);
+                } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
+                    queue = new ParallelJobQueue(queueInfo.queueName, config, services);
+                } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
+                    queue = new ParallelJobQueue(queueInfo.queueName, config, services);
+                }
+                // this is just a sanity check, actually we always have a queue instance here
+                if ( queue != null ) {
+                    queues.put(queueInfo.queueName, queue);
+                    ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
+                    queue.start();
+                }
+            }
+        }
+    }
+
+    /**
+     * This method is invoked periodically by the scheduler.
+     * In the default configuration every minute
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+        this.maintain();
+    }
+
+    private void outdateQueue(final AbstractJobQueue queue) {
+        // remove the queue with the old name
+        // check for main queue
+        final String oldName = ResourceHelper.filterQueueName(queue.getName());
+        this.queues.remove(oldName);
+        // check if we can close or have to rename
+        if ( queue.tryToClose() ) {
+            // copy statistics
+            // update mbeans
+            ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, queue));
+        } else {
+            queue.outdate();
+            // readd with new name
+            String newName = ResourceHelper.filterName(queue.getName());
+            int index = 0;
+            while ( this.queues.containsKey(newName) ) {
+                newName = ResourceHelper.filterName(queue.getName()) + '$' + String.valueOf(index++);
+            }
+            this.queues.put(newName, queue);
+            // update mbeans
+            ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue));
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#restart()
+     */
+    public void restart() {
+        // let's rename/close all queues and clear them
+        synchronized ( queuesLock ) {
+            final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
+            for(final AbstractJobQueue queue : queues ) {
+                queue.clear();
+                this.outdateQueue(queue);
+            }
+        }
+    }
+
+    private void stopProcessing() {
+        // let's rename/close all queues and clear them
+        synchronized ( queuesLock ) {
+            final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
+            for(final AbstractJobQueue queue : queues ) {
+                queue.clear();
+                this.outdateQueue(queue);
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String)
+     */
+    public Queue getQueue(final String name) {
+        return this.queues.get(name);
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobManager#getQueues()
+     */
+    public Iterable<Queue> getQueues() {
+        final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
+        return new Iterable<Queue>() {
+
+            @Override
+            public Iterator<Queue> iterator() {
+                return new Iterator<Queue>() {
+
+                    @Override
+                    public boolean hasNext() {
+                        return jqI.hasNext();
+                    }
+
+                    @Override
+                    public Queue next() {
+                        return jqI.next();
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1632141&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Wed Oct 15 17:55:52 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.apache.sling.event.impl.jobs.topics.TopicManager;
+import org.osgi.service.event.EventAdmin;
+
+public class QueueServices {
+
+    public JobConsumerManager jobConsumerManager;
+
+    public EventAdmin eventAdmin;
+
+    public ThreadPoolManager threadPoolManager;
+
+    public Scheduler scheduler;
+
+    public TopicManager topicManager;
+
+    public StatisticsManager statisticsManager;
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain