You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/03/16 17:07:08 UTC

svn commit: r1667052 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event: impl/jobs/ impl/jobs/deprecated/ impl/jobs/queues/ jobs/

Author: cziegeler
Date: Mon Mar 16 16:07:08 2015
New Revision: 1667052

URL: http://svn.apache.org/r1667052
Log:
SLING-4481 : Reduce the number of controller threads for queue

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java   (with props)
Removed:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobRunner.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Mon Mar 16 16:07:08 2015
@@ -35,6 +35,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
 
 
 /**
@@ -44,15 +45,19 @@ public class JobHandler {
 
     private final JobImpl job;
 
-    public long started = -1;
+    public volatile long started = -1;
 
     private volatile boolean isStopped = false;
 
     private final JobManagerConfiguration configuration;
 
+    private final JobExecutor consumer;
+
     public JobHandler(final JobImpl job,
+            final JobExecutor consumer,
             final JobManagerConfiguration configuration) {
         this.job = job;
+        this.consumer = consumer;
         this.configuration = configuration;
     }
 
@@ -60,6 +65,10 @@ public class JobHandler {
         return this.job;
     }
 
+    public JobExecutor getConsumer() {
+        return this.consumer;
+    }
+
     public boolean startProcessing(final Queue queue) {
         this.isStopped = false;
         return this.persistJobProperties(this.job.prepare(queue));

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Mar 16 16:07:08 2015
@@ -47,7 +47,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
-import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
+import org.apache.sling.event.impl.jobs.queues.JobQueueImpl;
 import org.apache.sling.event.impl.jobs.queues.QueueManager;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
 import org.apache.sling.event.impl.jobs.tasks.CleanUpTask;
@@ -347,7 +347,7 @@ public class JobManagerImpl
                         resolver.close();
                     }
                 } else {
-                    final JobHandler jh = new JobHandler(job, this.configuration);
+                    final JobHandler jh = new JobHandler(job, null, this.configuration);
                     jh.finished(Job.JobState.DROPPED, true, -1);
                 }
             }
@@ -855,7 +855,7 @@ public class JobManagerImpl
         if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
             // get the queue configuration
             final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
-            final AbstractJobQueue queue = (AbstractJobQueue)this.qManager.getQueue(queueInfo.queueName);
+            final JobQueueImpl queue = (JobQueueImpl)this.qManager.getQueue(queueInfo.queueName);
 
             boolean stopped = false;
             if ( queue != null ) {
@@ -863,7 +863,7 @@ public class JobManagerImpl
             }
             if ( forward && !stopped ) {
                 // mark the job as stopped
-                final JobHandler jh = new JobHandler(job,this.configuration);
+                final JobHandler jh = new JobHandler(job, null, this.configuration);
                 jh.finished(JobState.STOPPED, true, -1);
             }
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java Mon Mar 16 16:07:08 2015
@@ -18,31 +18,22 @@
  */
 package org.apache.sling.event.impl.jobs.deprecated;
 
-import org.osgi.service.event.Event;
+import org.apache.sling.event.jobs.JobProcessor;
 
 public interface JobStatusNotifier {
 
     String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
 
-    class NotifierContext {
-        private final JobStatusNotifier notifier;
-
-        public NotifierContext(final JobStatusNotifier n) {
-            this.notifier = n;
-        }
-
-        public JobStatusNotifier getJobStatusNotifier() {
-            return this.notifier;
-        }
-    }
-
     /**
-     * Send an acknowledge message that someone is processing the job.
-     * @param job The job.
+     * Notify the job handling that the job has been ack'ed.
+     * If a processor is set, the job queue will use that processor to execute the job.
+     * If it is not set, async processing is enabled and {@link #finishedJob(boolean)}
+     * needs to be called by the caller of this method.
+     * @param processor The job processor.
      * @return <code>true</code> if the ack is ok, <code>false</code> otherwise (e.g. if
      *   someone else already send an ack for this job.
      */
-    boolean sendAcknowledge(Event job);
+    boolean getAcknowledge(final JobProcessor processor);
 
     /**
      * Notify that the job is finished.
@@ -50,9 +41,8 @@ public interface JobStatusNotifier {
      * during the processing. If the job should be rescheduled, <code>true</code> indicates
      * that the job could be rescheduled. If an error occurs or the number of retries is
      * exceeded, <code>false</code> will be returned.
-     * @param job The job.
      * @param reschedule Should the event be rescheduled?
      * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
      */
-    boolean finishedJob(Event job, boolean reschedule);
+    boolean finishedJob(boolean reschedule);
 }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.deprecated;
+
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+
+public class JobStatusNotifierImpl implements JobStatusNotifier {
+
+    private volatile boolean isCalled = false;
+
+    private volatile boolean isMarked = false;
+
+    private volatile JobProcessor processor;
+
+    private volatile JobExecutionContext context;
+
+    @Override
+    public boolean getAcknowledge(final JobProcessor processor) {
+        synchronized ( this ) {
+            this.isCalled = true;
+            this.processor = processor;
+            return !isMarked;
+        }
+    }
+
+    @Override
+    public boolean finishedJob(final boolean reschedule) {
+        if ( this.context != null ) {
+            this.context.asyncProcessingFinished(reschedule ? context.result().failed() : context.result().succeeded());
+        }
+        return false;
+    }
+
+    public void markDone() {
+        this.isMarked = true;
+    }
+
+    public boolean isCalled() {
+        return this.isCalled;
+    }
+
+    public JobProcessor getProcessor() {
+        return this.processor;
+    }
+
+    public void setJobExecutionContext(final JobExecutionContext context) {
+        this.context = context;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+
+public class JobExecutionContextImpl implements JobExecutionContext {
+
+    public interface ASyncHandler {
+        void finished(Job.JobState state);
+    }
+
+    private volatile boolean hasInit = false;
+
+    private final JobHandler handler;
+
+    private final Object lock;
+
+    private final AtomicBoolean isAsync;
+
+    private final ASyncHandler asyncHandler;
+
+    public JobExecutionContextImpl(final JobHandler handler,
+            final Object syncLock,
+            final AtomicBoolean isAsync,
+            final ASyncHandler asyncHandler) {
+        this.handler = handler;
+        this.lock = syncLock;
+        this.isAsync = isAsync;
+        this.asyncHandler = asyncHandler;
+    }
+
+    @Override
+    public void initProgress(final int steps,
+            final long eta) {
+        if ( !hasInit ) {
+            handler.persistJobProperties(handler.getJob().startProgress(steps, eta));
+            hasInit = true;
+        }
+    }
+
+    @Override
+    public void incrementProgressCount(final int steps) {
+        if ( hasInit ) {
+            handler.persistJobProperties(handler.getJob().setProgress(steps));
+        }
+    }
+
+    @Override
+    public void updateProgress(final long eta) {
+        if ( hasInit ) {
+            handler.persistJobProperties(handler.getJob().update(eta));
+        }
+    }
+
+    @Override
+    public void log(final String message, Object... args) {
+        handler.persistJobProperties(handler.getJob().log(message, args));
+    }
+
+    @Override
+    public boolean isStopped() {
+        return handler.isStopped();
+    }
+
+    @Override
+    public void asyncProcessingFinished(final JobExecutionResult result) {
+        synchronized ( lock ) {
+            if ( isAsync.compareAndSet(true, false) ) {
+                Job.JobState state = null;
+                if ( result.succeeded() ) {
+                    state = Job.JobState.SUCCEEDED;
+                } else if ( result.failed() ) {
+                    state = Job.JobState.QUEUED;
+                } else if ( result.cancelled() ) {
+                    if ( handler.isStopped() ) {
+                        state = Job.JobState.STOPPED;
+                    } else {
+                        state = Job.JobState.ERROR;
+                    }
+                }
+                asyncHandler.finished(state);
+            } else {
+                throw new IllegalStateException("Job is not processed async " + handler.getJob().getId());
+            }
+        }
+    }
+
+    @Override
+    public ResultBuilder result() {
+        return new ResultBuilderImpl();
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EventingThreadPool;
+import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
+import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifierImpl;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
+import org.apache.sling.event.impl.support.BatchResourceRemover;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.Job.JobState;
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration.Type;
+import org.apache.sling.event.jobs.Statistics;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.osgi.service.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public class JobQueueImpl
+    implements Queue {
+
+    /** Default timeout for suspend. */
+    private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
+
+    /** Default number of milliseconds to wait for an ack. */
+    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
+    /** The logger. */
+    private final Logger logger;
+
+    /** Configuration. */
+    private final InternalQueueConfiguration configuration;
+
+    /** The queue name. */
+    private volatile String queueName;
+
+    /** Are we still running? */
+    private volatile boolean running;
+
+    /** Suspended since. */
+    private final AtomicLong suspendedSince = new AtomicLong(-1);
+
+    /** Services used by the queues. */
+    private final QueueServices services;
+
+    /** The map of events we're processing. */
+    private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
+
+    private final ThreadPool threadPool;
+
+    /** Async counter. */
+    private final AtomicInteger asyncCounter = new AtomicInteger();
+
+    /** Flag for outdated. */
+    private final AtomicBoolean isOutdated = new AtomicBoolean(false);
+
+    /** A marker for closing the queue. */
+    private final AtomicBoolean closeMarker = new AtomicBoolean(false);
+
+    /** The job cache. */
+    private final QueueJobCache cache;
+
+    /** Semaphore for handling the max number of jobs. */
+    private final Semaphore available;
+
+    /**
+     * Create a new queue
+     * @param name The queue name
+     * @param config The queue configuration
+     */
+    public JobQueueImpl(final String name,
+                            final InternalQueueConfiguration config,
+                            final QueueServices services,
+                            final Set<String> topics) {
+        if ( config.getOwnThreadPoolSize() > 0 ) {
+            this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
+        } else {
+            this.threadPool = services.eventingThreadPool;
+        }
+        this.queueName = name;
+        this.configuration = config;
+        this.services = services;
+        this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+        this.running = true;
+        this.cache = new QueueJobCache(services.configuration, config.getType(), topics);
+        this.cache.fillCache();
+        this.available = new Semaphore(config.getMaxParallel(), true);
+        logger.info("Starting job queue {}", queueName);
+        logger.debug("Configuration for job queue={}", configuration);
+    }
+
+    /**
+     * Return the queue configuration
+     */
+    @Override
+    public InternalQueueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * Get the name of the job queue.
+     */
+    @Override
+    public String getName() {
+        return this.queueName;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStatistics()
+     */
+    @Override
+    public Statistics getStatistics() {
+        return this.services.statisticsManager.getQueueStatistics(this.queueName);
+    }
+
+    /**
+     * Start the job queue.
+     */
+    public void start() {
+        start(false);
+    }
+    /**
+     * Start the job queue.
+     * This method might be called concurrently, therefore we synchronize
+     */
+    public synchronized void start(boolean justOne) {
+        // we start as many jobs in parallel as possible
+        while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
+            boolean started = false;
+            try {
+                final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, false);
+                if ( handler != null ) {
+                    started = true;
+                    this.threadPool.execute(new Runnable() {
+
+                        @Override
+                        public void run() {
+                            // update thread priority and name
+                            final Thread currentThread = Thread.currentThread();
+                            final String oldName = currentThread.getName();
+                            final int oldPriority = currentThread.getPriority();
+
+                            currentThread.setName(oldName + "-" + handler.getJob().getQueueName() + "(" + handler.getJob().getTopic() + ")");
+                            if ( configuration.getThreadPriority() != null ) {
+                                switch ( configuration.getThreadPriority() ) {
+                                    case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+                                                break;
+                                    case MIN  : currentThread.setPriority(Thread.MIN_PRIORITY);
+                                                break;
+                                    case MAX  : currentThread.setPriority(Thread.MAX_PRIORITY);
+                                                break;
+                                }
+                            }
+
+                            try {
+                                startJob(handler);
+                            } finally {
+                                currentThread.setPriority(oldPriority);
+                                currentThread.setName(oldName);
+                            }
+                            // and try to launch another job
+                            start(true);
+                        }
+                    });
+                    if ( justOne ) {
+                        break;
+                    }
+                } else {
+                    // no job available, stop look
+                    break;
+                }
+
+            } finally {
+                if ( !started ) {
+                    this.available.release();
+                }
+            }
+        }
+    }
+
+    private void startJob(final JobHandler handler) {
+        try {
+            this.closeMarker.set(false);
+            try {
+                final JobImpl job = handler.getJob();
+                handler.started = System.currentTimeMillis();
+
+                if ( handler.getConsumer() != null ) {
+                    final long queueTime = handler.started - job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class).getTime().getTime();
+                    NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
+                    synchronized ( this.processingJobsLists ) {
+                        this.processingJobsLists.put(job.getId(), handler);
+                    }
+
+                    final Object lock = new Object();
+
+                    JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
+                    Job.JobState resultState = Job.JobState.ERROR;
+                    final AtomicBoolean isAsync = new AtomicBoolean(false);
+
+                    try {
+                        synchronized ( lock ) {
+                            final JobExecutionContext ctx = new JobExecutionContextImpl(handler, lock, isAsync, new JobExecutionContextImpl.ASyncHandler() {
+
+                                @Override
+                                public void finished(final JobState state) {
+                                    services.jobConsumerManager.unregisterListener(job.getId());
+                                    finishedJob(job.getId(), state, true);
+                                    asyncCounter.decrementAndGet();
+                                }
+                            });
+
+                            result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
+                            if ( result == null ) { // ASYNC processing
+                                services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
+                                asyncCounter.incrementAndGet();
+                                isAsync.set(true);
+                            } else {
+                                if ( result.succeeded() ) {
+                                    resultState = Job.JobState.SUCCEEDED;
+                                } else if ( result.failed() ) {
+                                    resultState = Job.JobState.QUEUED;
+                                } else if ( result.cancelled() ) {
+                                    if ( handler.isStopped() ) {
+                                        resultState = Job.JobState.STOPPED;
+                                    } else {
+                                        resultState = Job.JobState.ERROR;
+                                    }
+                                }
+                            }
+                        }
+                    } catch (final Throwable t) { //NOSONAR
+                        logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
+                        // we don't reschedule if an exception occurs
+                        result = JobExecutionResultImpl.CANCELLED;
+                        resultState = Job.JobState.ERROR;
+                    } finally {
+                        if ( result != null ) {
+                            if ( result.getRetryDelayInMs() != null ) {
+                                job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
+                            }
+                            if ( result.getMessage() != null ) {
+                               job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
+                            }
+                            this.finishedJob(job.getId(), resultState, false);
+                        }
+                    }
+
+                } else {
+                    final Event jobEvent = this.getJobEvent(handler);
+                    final JobStatusNotifierImpl notifier = (JobStatusNotifierImpl) jobEvent.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+                    // we need async delivery, otherwise we might create a deadlock
+                    // as this method runs inside a synchronized block and the finishedJob
+                    // method as well!
+                    final long endOfAck = System.currentTimeMillis() + DEFAULT_WAIT_FOR_ACK_IN_MS;
+                    this.services.eventAdmin.postEvent(jobEvent);
+
+                    // wait for the ack
+                    synchronized ( notifier ) {
+                        while ( System.currentTimeMillis() < endOfAck && !notifier.isCalled() ) {
+                            try {
+                                notifier.wait(endOfAck - System.currentTimeMillis());
+                            } catch ( final InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                ignoreException(ie);
+                            }
+                        }
+                        if ( !notifier.isCalled() ) {
+                            notifier.markDone();
+                        }
+                    }
+                    if ( !notifier.isCalled() ) {
+                        if ( handler.reschedule() ) {
+                            this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
+                            handler.getJob().retry();
+                            this.requeue(handler);
+                        }
+                    } else {
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug("Received ack for job {}", Utility.toString(job));
+                        }
+                        // check for processor
+                        final JobProcessor processor = notifier.getProcessor();
+                        if ( processor != null ) {
+                            boolean result = false;
+                            try {
+                                result = processor.process(jobEvent);
+                            } catch (Throwable t) { //NOSONAR
+                                LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
+                                // we don't reschedule if an exception occurs
+                                result = true;
+                            }
+                            if ( result ) {
+                                this.finishedJob(job.getId(), Job.JobState.SUCCEEDED, false);
+                            } else {
+                                this.finishedJob(job.getId(), Job.JobState.QUEUED, false);
+                            }
+                        } else {
+                            // async processing
+                            final AtomicBoolean isAsync = new AtomicBoolean(true);
+                            final JobExecutionContext ctx = new JobExecutionContextImpl(handler, new Object(), isAsync, new JobExecutionContextImpl.ASyncHandler() {
+
+                                @Override
+                                public void finished(final JobState state) {
+                                    services.jobConsumerManager.unregisterListener(job.getId());
+                                    finishedJob(job.getId(), state, true);
+                                    asyncCounter.decrementAndGet();
+                                }
+                            });
+                            services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
+                            asyncCounter.incrementAndGet();
+
+                            notifier.setJobExecutionContext(ctx);
+                        }
+                    }
+                }
+            } catch (final Exception re) {
+                // if an exception occurs, we just log
+                this.logger.error("Exception during job processing.", re);
+            }
+        } finally {
+            this.available.release();
+        }
+    }
+
+    /**
+     * Outdate this queue.
+     */
+    public void outdate() {
+        if ( this.isOutdated.compareAndSet(false, true) ) {
+            final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
+            this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
+            this.queueName = name;
+        }
+    }
+
+    /**
+     * Check if the queue can be closed
+     */
+    public boolean tryToClose() {
+        // resume the queue as we want to close it!
+        this.resume();
+        // check if possible
+        if ( this.canBeClosed() ) {
+            if ( this.closeMarker.get() ) {
+                this.close();
+                return true;
+            }
+            this.closeMarker.set(true);
+        }
+        return false;
+    }
+
+    /**
+     * Check whether this queue can be closed
+     */
+    public boolean canBeClosed() {
+        return !this.isSuspended()
+            && this.cache.isEmpty()
+            && this.asyncCounter.get() == 0
+            && this.available.availablePermits() == this.configuration.getMaxParallel();
+    }
+
+    /**
+     * Close this queue.
+     */
+    public void close() {
+        this.running = false;
+        this.logger.debug("Shutting down job queue {}", queueName);
+        this.resume();
+
+        synchronized ( this.processingJobsLists ) {
+            this.processingJobsLists.clear();
+        }
+        if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
+            ((EventingThreadPool)this.threadPool).release();
+        }
+
+        this.logger.info("Stopped job queue {}", this.queueName);
+    }
+
+    /**
+     * Periodic maintenance
+     */
+    public void maintain() {
+        // check suspended
+        final long since = this.suspendedSince.get();
+        if ( since != -1 && since + MAX_SUSPEND_TIME < System.currentTimeMillis() ) {
+            logger.info("Waking up suspended queue. It has been suspended for more than {}ms", MAX_SUSPEND_TIME);
+            this.resume();
+        }
+
+        // TODO - set full cache search
+
+        this.start();
+    }
+
+    /**
+     * Inform the queue about new job for the given topics.
+     * @param topics the new topics
+     */
+    public void wakeUpQueue(final Set<String> topics) {
+        this.cache.handleNewTopics(topics);
+        this.start();
+    }
+
+    /**
+     * Put a job back in the queue
+     * @param handler The job handler
+     */
+    private void requeue(final JobHandler handler) {
+        this.cache.reschedule(handler);
+        this.start();
+    }
+
+    private static final class RescheduleInfo {
+        public boolean reschedule = false;
+        public long    processingTime;
+    }
+
+    private RescheduleInfo handleReschedule(final JobHandler handler, final Job.JobState resultState) {
+        final RescheduleInfo info = new RescheduleInfo();
+        switch ( resultState ) {
+            case SUCCEEDED : // job is finished
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
+                }
+                info.processingTime = System.currentTimeMillis() - handler.started;
+                NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
+                break;
+            case QUEUED : // check if we exceeded the number of retries
+                final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
+                int retryCount = (Integer)handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+
+                retryCount++;
+                if ( retries != -1 && retryCount > retries ) {
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
+                    }
+                    NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+                } else {
+                    info.reschedule = true;
+                    handler.getJob().retry();
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
+                    }
+                    NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
+                }
+                break;
+            default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
+                }
+                NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+                break;
+        }
+
+        return info;
+    }
+
+    /**
+     * Handle job finish and determine whether to reschedule or cancel the job
+     */
+    private boolean finishedJob(final String jobId,
+                                Job.JobState resultState,
+                                final boolean isAsync) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Received finish for job {}, resultState={}", jobId, resultState);
+        }
+
+        // get job handler
+        final JobHandler handler;
+        // let's remove the event from our processing list
+        synchronized ( this.processingJobsLists ) {
+            handler = this.processingJobsLists.remove(jobId);
+        }
+
+        if ( !this.running ) {
+            this.logger.warn("Queue is not running anymore. Discarding finish for {}", jobId);
+            return false;
+        }
+
+        if ( handler == null ) {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("This job has never been started by this queue: {}", jobId);
+            }
+            return false;
+        }
+
+        // handle the reschedule, a new job might be returned with updated reschedule info!
+        final RescheduleInfo rescheduleInfo = this.handleReschedule(handler, resultState);
+        if ( resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule ) {
+            resultState = Job.JobState.GIVEN_UP;
+        }
+
+        if ( !rescheduleInfo.reschedule ) {
+            // we keep cancelled jobs and succeeded jobs if the queue is configured like this.
+            final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
+            handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
+        } else {
+            this.reschedule(handler);
+        }
+
+        return rescheduleInfo.reschedule;
+    }
+
+    /**
+     * Create the real job event.
+     * This generates a new event object with the same properties, but with the
+     * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+     * @param info The job event.
+     * @return The real job event.
+     */
+    private Event getJobEvent(final JobHandler info) {
+        final String eventTopic = info.getJob().getTopic();
+        final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+        for(final String name : info.getJob().getPropertyNames()) {
+            properties.put(name, info.getJob().getProperty(name));
+        }
+
+        // put properties for finished job callback
+        properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME, new JobStatusNotifierImpl());
+
+        // remove app id and distributable flag
+        properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
+        properties.remove(EventUtil.PROPERTY_APPLICATION);
+
+        return new Event(eventTopic, properties);
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#resume()
+     */
+    @Override
+    public void resume() {
+        if ( this.suspendedSince.getAndSet(-1) != -1 ) {
+            this.logger.debug("Waking up suspended queue {}", queueName);
+            this.start();
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#suspend()
+     */
+    @Override
+    public void suspend() {
+        if ( this.suspendedSince.compareAndSet(-1, System.currentTimeMillis()) ) {
+            this.logger.debug("Suspending queue {}", queueName);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#isSuspended()
+     */
+    @Override
+    public boolean isSuspended() {
+        return this.suspendedSince.get() != -1;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#removeAll()
+     */
+    @Override
+    public synchronized void removeAll() {
+        final Set<String> topics = this.cache.getTopics();
+        logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
+
+        if ( !topics.isEmpty() ) {
+
+            final ResourceResolver resolver = this.services.configuration.createResourceResolver();
+            try {
+                final Resource baseResource = resolver.getResource(this.services.configuration.getLocalJobsPath());
+
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final BatchResourceRemover brr = new BatchResourceRemover();
+
+                    for(final String t : topics) {
+                        final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
+                        if ( topicResource != null ) {
+                            JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
+
+                                @Override
+                                public boolean handle(final JobImpl job) {
+                                    final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+                                    // sanity check
+                                    if ( jobResource != null ) {
+                                        try {
+                                            brr.delete(jobResource);
+                                        } catch ( final PersistenceException ignore) {
+                                            logger.error("Unable to remove job " + job, ignore);
+                                            topicResource.getResourceResolver().revert();
+                                            topicResource.getResourceResolver().refresh();
+                                        }
+                                    }
+                                    return true;
+                                }
+                            });
+                        }
+                    }
+                    try {
+                        resolver.commit();
+                    } catch ( final PersistenceException ignore) {
+                        logger.error("Unable to remove jobs", ignore);
+                    }
+                }
+            } finally {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    @Override
+    public void clear() {
+        // this is a noop
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
+     */
+    @Override
+    public Object getState(final String key) {
+        // not supported for now
+        return null;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+     */
+    @Override
+    public String getStateInfo() {
+        return "outdated=" + this.isOutdated.get() +
+                ", suspendedSince=" + this.suspendedSince.get() +
+                ", asyncJobs=" + this.asyncCounter.get() +
+                ", jobCount=" + String.valueOf(this.configuration.getMaxParallel() - this.available.availablePermits());
+    }
+
+    /**
+     * Get the retry delay for a job.
+     * @param handler The job handler.
+     * @return The retry delay
+     */
+    private long getRetryDelay(final JobHandler handler) {
+        long delay = this.configuration.getRetryDelayInMs();
+        if ( handler.getJob().getProperty(JobImpl.PROPERTY_DELAY_OVERRIDE) != null ) {
+            delay = handler.getJob().getProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, Long.class);
+        } else  if ( handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY) != null ) {
+            delay = handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY, Long.class);
+        }
+        return delay;
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    public boolean stopJob(final JobImpl job) {
+        final JobHandler handler;
+        synchronized ( this.processingJobsLists ) {
+            handler = this.processingJobsLists.get(job.getId());
+        }
+        if ( handler != null ) {
+            handler.stop();
+        }
+        return handler != null;
+    }
+
+    private void reschedule(final JobHandler handler) {
+        // we delay putting back the job until the retry delay is over
+        final long delay = this.getRetryDelay(handler);
+        if ( delay > 0 ) {
+            if ( this.configuration.getType() == Type.ORDERED ) {
+                this.suspend();
+            }
+            handler.addToRetryList();
+            final Date fireDate = new Date();
+            fireDate.setTime(System.currentTimeMillis() + delay);
+
+            final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
+            final Runnable t = new Runnable() {
+                @Override
+                public void run() {
+                    if ( handler.removeFromRetryList() ) {
+                        requeue(handler);
+                    }
+                    if ( configuration.getType() == Type.ORDERED ) {
+                        resume();
+                    }
+                }
+            };
+            services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName));
+        } else {
+            // put directly into queue
+            this.requeue(handler);
+        }
+    }
+}
+

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Mon Mar 16 16:07:08 2015
@@ -30,12 +30,16 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.QueueConfiguration.Type;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,34 +127,59 @@ public class QueueJobCache {
 
     /**
      * Get the next job.
-     * This method is not called concurrently, however
+     * This method is potentially called concurrently, and
      * {@link #reschedule(JobHandler)} and {@link #handleNewTopics(Set)}
      * can be called concurrently.
      */
-    public JobImpl getNextJob(final boolean doFull) {
-        JobImpl result = null;
+    public JobHandler getNextJob(final JobConsumerManager jobConsumerManager,
+            final Queue queue,
+            final boolean doFull) {
+        JobHandler handler = null;
 
         synchronized ( this.cache ) {
-            if ( this.cache.isEmpty() ) {
-                final Set<String> checkingTopics = new HashSet<String>();
-                synchronized ( this.topicsWithNewJobs ) {
-                    checkingTopics.addAll(this.topicsWithNewJobs);
-                    this.topicsWithNewJobs.clear();
-                }
-                if ( doFull ) {
-                    checkingTopics.addAll(this.topics);
-                }
-                if ( !checkingTopics.isEmpty() ) {
-                    this.loadJobs(checkingTopics);
+            boolean retry;
+            do {
+                retry = false;
+                if ( this.cache.isEmpty() ) {
+                    final Set<String> checkingTopics = new HashSet<String>();
+                    synchronized ( this.topicsWithNewJobs ) {
+                        checkingTopics.addAll(this.topicsWithNewJobs);
+                        this.topicsWithNewJobs.clear();
+                    }
+                    if ( doFull ) {
+                        checkingTopics.addAll(this.topics);
+                    }
+                    if ( !checkingTopics.isEmpty() ) {
+                        this.loadJobs(checkingTopics);
+                    }
                 }
-            }
 
-            if ( !this.cache.isEmpty() ) {
-                result = this.cache.remove(0);
-            }
+                if ( !this.cache.isEmpty() ) {
+                    final JobImpl job = this.cache.remove(0);
+                    final JobExecutor consumer = jobConsumerManager.getExecutor(job.getTopic());
+
+                    handler = new JobHandler(job, consumer, this.configuration);
+                    if ( (consumer != null || (job.isBridgedEvent() && jobConsumerManager.supportsBridgedEvents())) ) {
+                        if ( !handler.startProcessing(queue) ) {
+                            if ( logger.isDebugEnabled() ) {
+                                logger.debug("Discarding removed job {}", Utility.toString(job));
+                            }
+                            handler = null;
+                            retry = true;
+                        }
+                    } else {
+                        // no consumer on this instance, assign to another instance
+                        handler.reassign();
+
+                        handler = null;
+                        retry = true;
+                    }
+
+                }
+            } while ( handler == null && retry);
         }
 
-        return result;
+        return handler;
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Mon Mar 16 16:07:08 2015
@@ -39,7 +39,9 @@ import org.apache.felix.scr.annotations.
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.EventingThreadPool;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
@@ -56,7 +58,6 @@ import org.apache.sling.event.impl.suppo
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.Queue;
-import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.jmx.QueuesMBean;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -67,13 +68,13 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Implementation of the job manager.
+ * Implementation of the queue manager.
  */
 @Component(immediate=true)
 @Service(value={Runnable.class, QueueManager.class, EventHandler.class})
 @Properties({
-    @Property(name="scheduler.period", longValue=60),
-    @Property(name="scheduler.concurrent", boolValue=false),
+    @Property(name=Scheduler.PROPERTY_SCHEDULER_PERIOD, longValue=60),
+    @Property(name=Scheduler.PROPERTY_SCHEDULER_CONCURRENT, boolValue=false),
     @Property(name=EventConstants.EVENT_TOPIC, value=NotificationConstants.TOPIC_JOB_ADDED)
 })
 public class QueueManager
@@ -97,6 +98,12 @@ public class QueueManager
     @Reference
     private ThreadPoolManager threadPoolManager;
 
+    /**
+     * Our thread pool.
+     */
+    @Reference(referenceInterface=EventingThreadPool.class)
+    private ThreadPool threadPool;
+
     /** The job manager configuration. */
     @Reference
     private JobManagerConfiguration configuration;
@@ -108,7 +115,7 @@ public class QueueManager
     private final Object queuesLock = new Object();
 
     /** All active queues. */
-    private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+    private final Map<String, JobQueueImpl> queues = new ConcurrentHashMap<String, JobQueueImpl>();
 
     /** We count the scheduler runs. */
     private volatile long schedulerRuns;
@@ -133,6 +140,7 @@ public class QueueManager
         queueServices.scheduler = this.scheduler;
         queueServices.threadPoolManager = this.threadPoolManager;
         queueServices.statisticsManager = statisticsManager;
+        queueServices.eventingThreadPool = this.threadPool;
         this.configuration.addListener(this);
     }
 
@@ -144,9 +152,9 @@ public class QueueManager
         logger.debug("Apache Sling Queue Manager stopping on instance {}", Environment.APPLICATION_ID);
 
         this.configuration.removeListener(this);
-        final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
+        final Iterator<JobQueueImpl> i = this.queues.values().iterator();
         while ( i.hasNext() ) {
-            final AbstractJobQueue jbq = i.next();
+            final JobQueueImpl jbq = i.next();
             jbq.close();
             // update mbeans
             ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
@@ -166,10 +174,9 @@ public class QueueManager
         this.schedulerRuns++;
         logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
 
-        // check for unprocessed jobs first
-        logger.debug("Checking for unprocessed jobs...");
-        for(final AbstractJobQueue jbq : this.queues.values() ) {
-            jbq.checkForUnprocessedJobs();
+        // queue maintenance
+        for(final JobQueueImpl jbq : this.queues.values() ) {
+            jbq.maintain();
         }
 
         // we only do a full clean up on every fifth run
@@ -181,10 +188,10 @@ public class QueueManager
 
            // we synchronize to avoid creating a queue which is about to be removed during cleanup
             synchronized ( queuesLock ) {
-                final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
+                final Iterator<Map.Entry<String, JobQueueImpl>> i = this.queues.entrySet().iterator();
                 while ( i.hasNext() ) {
-                    final Map.Entry<String, AbstractJobQueue> current = i.next();
-                    final AbstractJobQueue jbq = current.getValue();
+                    final Map.Entry<String, JobQueueImpl> current = i.next();
+                    final JobQueueImpl jbq = current.getValue();
                     if ( jbq.tryToClose() ) {
                         logger.debug("Removing idle job queue {}", jbq);
                         // remove
@@ -210,7 +217,7 @@ public class QueueManager
             final Set<String> topics) {
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
         // get or create queue
-        AbstractJobQueue queue = null;
+        JobQueueImpl queue = null;
         // we synchronize to avoid creating a queue which is about to be removed during cleanup
         synchronized ( queuesLock ) {
             queue = this.queues.get(queueInfo.queueName);
@@ -221,23 +228,12 @@ public class QueueManager
                 queue = null;
             }
             if ( queue == null ) {
-                if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
-                    queue = new OrderedJobQueue(queueInfo.queueName, config, queueServices, topics);
-                } else if ( config.getType() == QueueConfiguration.Type.UNORDERED
-                    || config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
-                    queue = new ParallelJobQueue(queueInfo.queueName, config, queueServices, topics);
-                }
-                // this is just a sanity check, actually we always have a queue instance here
-                if ( queue != null ) {
-                    // on startup the queue might be empty and we can simply discard it
-                    if ( !queue.canBeClosed() ) {
-                        queues.put(queueInfo.queueName, queue);
-                        ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
-                        queue.start();
-                    }
-                } else {
-                    // we log anyway
-                    logger.error("Unable to create new queue: unknown queue type {}", config);
+                queue = new JobQueueImpl(queueInfo.queueName, config, queueServices, topics);
+                // on startup the queue might be empty and we can simply discard it
+                if ( !queue.canBeClosed() ) {
+                    queues.put(queueInfo.queueName, queue);
+                    ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
+                    queue.start();
                 }
             } else {
                 queue.wakeUpQueue(topics);
@@ -255,7 +251,7 @@ public class QueueManager
         this.maintain();
     }
 
-    private void outdateQueue(final AbstractJobQueue queue) {
+    private void outdateQueue(final JobQueueImpl queue) {
         // remove the queue with the old name
         // check for main queue
         final String oldName = ResourceHelper.filterQueueName(queue.getName());
@@ -285,8 +281,8 @@ public class QueueManager
     private void restart() {
         // let's rename/close all queues and clear them
         synchronized ( queuesLock ) {
-            final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
-            for(final AbstractJobQueue queue : queues ) {
+            final List<JobQueueImpl> queues = new ArrayList<JobQueueImpl>(this.queues.values());
+            for(final JobQueueImpl queue : queues ) {
                 this.outdateQueue(queue);
             }
         }
@@ -295,7 +291,7 @@ public class QueueManager
         if ( config != null ) {
             final List<Job> rescheduleList = this.configuration.clearJobRetryList();
             for(final Job j : rescheduleList) {
-                final JobHandler jh = new JobHandler((JobImpl)j, this.configuration);
+                final JobHandler jh = new JobHandler((JobImpl)j, null, this.configuration);
                 jh.reschedule();
             }
         }
@@ -312,7 +308,7 @@ public class QueueManager
      * @see org.apache.sling.event.jobs.JobManager#getQueues()
      */
     public Iterable<Queue> getQueues() {
-        final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
+        final Iterator<JobQueueImpl> jqI = this.queues.values().iterator();
         return new Iterable<Queue>() {
 
             @Override
@@ -418,4 +414,14 @@ public class QueueManager
         this.logger.debug("Established new topic mapping: {}", mapping);
         return mapping;
     }
+
+    protected void bindThreadPool(final EventingThreadPool etp) {
+        this.threadPool = etp;
+    }
+
+    protected void unbindThreadPool(final EventingThreadPool etp) {
+        if ( this.threadPool == etp ) {
+            this.threadPool = null;
+        }
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Mon Mar 16 16:07:08 2015
@@ -19,12 +19,18 @@
 package org.apache.sling.event.impl.jobs.queues;
 
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
 import org.osgi.service.event.EventAdmin;
 
+/**
+ * The queue services class is a helper class containing all
+ * services used by the queue implementations.
+ * This avoids passing a set of separate objects.
+ */
 public class QueueServices {
 
     public JobManagerConfiguration configuration;
@@ -38,4 +44,6 @@ public class QueueServices {
     public Scheduler scheduler;
 
     public StatisticsManager statisticsManager;
+
+    public ThreadPool eventingThreadPool;
 }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import org.apache.sling.event.impl.jobs.InternalJobState;
+import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext.ResultBuilder;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+
+public class ResultBuilderImpl implements ResultBuilder {
+
+    private String message;
+
+    private Long retryDelayInMs;
+
+    @Override
+    public JobExecutionResult failed(final long retryDelayInMs) {
+        this.retryDelayInMs = retryDelayInMs;
+        return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+    }
+
+    @Override
+    public ResultBuilder message(final String message) {
+        this.message = message;
+        return this;
+    }
+
+    @Override
+    public JobExecutionResult succeeded() {
+        return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs);
+    }
+
+    @Override
+    public JobExecutionResult failed() {
+        return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+    }
+
+    @Override
+    public JobExecutionResult cancelled() {
+        return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs);
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java Mon Mar 16 16:07:08 2015
@@ -253,14 +253,14 @@ public abstract class JobUtil {
      * @deprecated - Use the new {@link JobConsumer} interface instead.
      */
     @Deprecated
-    private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job) {
+    private static JobStatusNotifier getNotifier(final Event job) {
         // check if this is a job event
         if ( !isJobEvent(job) ) {
             return null;
         }
-        final JobStatusNotifier.NotifierContext ctx = (JobStatusNotifier.NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+        final JobStatusNotifier ctx = (JobStatusNotifier) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
         if ( ctx == null ) {
-            throw new IllegalArgumentException("JobStatusNotifier context is not available in event properties.");
+            throw new IllegalArgumentException("JobStatusNotifier is not available in event properties.");
         }
         return ctx;
     }
@@ -277,9 +277,9 @@ public abstract class JobUtil {
      */
     @Deprecated
     public static boolean acknowledgeJob(final Event job) {
-        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        final JobStatusNotifier ctx = getNotifier(job);
         if ( ctx != null ) {
-            if ( !ctx.getJobStatusNotifier().sendAcknowledge(job) ) {
+            if ( !ctx.getAcknowledge(null) ) {
                 // if we don't get an ack, someone else is already processing this job.
                 // we process but do not notify the job event handler.
                 LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
@@ -297,9 +297,9 @@ public abstract class JobUtil {
      */
     @Deprecated
     public static void finishedJob(final Event job) {
-        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        final JobStatusNotifier ctx = getNotifier(job);
         if ( ctx != null ) {
-            ctx.getJobStatusNotifier().finishedJob(job, false);
+            ctx.finishedJob(false);
         }
     }
 
@@ -311,9 +311,9 @@ public abstract class JobUtil {
      */
     @Deprecated
     public static boolean rescheduleJob(final Event job) {
-        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        final JobStatusNotifier ctx = getNotifier(job);
         if ( ctx != null ) {
-            return ctx.getJobStatusNotifier().finishedJob(job, true);
+            return ctx.finishedJob(true);
         }
         return false;
     }
@@ -327,16 +327,17 @@ public abstract class JobUtil {
     @Deprecated
     public static void processJob(final Event job, final JobProcessor processor) {
         // first check for a notifier context to send an acknowledge
-        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
-        boolean notify = ctx != null;
-        if ( ctx != null && !ctx.getJobStatusNotifier().sendAcknowledge(job) ) {
+        final JobStatusNotifier ctx = getNotifier(job);
+        if ( ctx != null && !ctx.getAcknowledge(processor) ) {
             // if we don't get an ack, someone else is already processing this job.
-            // we process but do not notify the job event handler.
+            // we do not process.
             LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
-            notify = false;
+            return;
+        }
+        if ( ctx != null ) {
+            return;
         }
         final JobPriority priority = (JobPriority) job.getProperty(PROPERTY_JOB_PRIORITY);
-        final boolean notifyResult = notify;
 
         final Runnable task = new Runnable() {
 
@@ -361,23 +362,13 @@ public abstract class JobUtil {
                                     break;
                     }
                 }
-                boolean result = false;
                 try {
-                    result = processor.process(job);
+                    processor.process(job);
                 } catch (Throwable t) { //NOSONAR
                     LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
-                    // we don't reschedule if an exception occurs
-                    result = true;
                 } finally {
                     currentThread.setPriority(oldPriority);
                     currentThread.setName(oldName);
-                    if ( notifyResult ) {
-                        if ( result ) {
-                            JobUtil.finishedJob(job);
-                        } else {
-                            JobUtil.rescheduleJob(job);
-                        }
-                    }
                 }
             }