You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/03/16 17:07:08 UTC
svn commit: r1667052 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event:
impl/jobs/ impl/jobs/deprecated/ impl/jobs/queues/ jobs/
Author: cziegeler
Date: Mon Mar 16 16:07:08 2015
New Revision: 1667052
URL: http://svn.apache.org/r1667052
Log:
SLING-4481 : Reduce the number of controller threads for queue
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java (with props)
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobRunner.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Mon Mar 16 16:07:08 2015
@@ -35,6 +35,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
/**
@@ -44,15 +45,19 @@ public class JobHandler {
private final JobImpl job;
- public long started = -1;
+ public volatile long started = -1;
private volatile boolean isStopped = false;
private final JobManagerConfiguration configuration;
+ private final JobExecutor consumer;
+
public JobHandler(final JobImpl job,
+ final JobExecutor consumer,
final JobManagerConfiguration configuration) {
this.job = job;
+ this.consumer = consumer;
this.configuration = configuration;
}
@@ -60,6 +65,10 @@ public class JobHandler {
return this.job;
}
+ public JobExecutor getConsumer() {
+ return this.consumer;
+ }
+
public boolean startProcessing(final Queue queue) {
this.isStopped = false;
return this.persistJobProperties(this.job.prepare(queue));
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Mar 16 16:07:08 2015
@@ -47,7 +47,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
-import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
+import org.apache.sling.event.impl.jobs.queues.JobQueueImpl;
import org.apache.sling.event.impl.jobs.queues.QueueManager;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.impl.jobs.tasks.CleanUpTask;
@@ -347,7 +347,7 @@ public class JobManagerImpl
resolver.close();
}
} else {
- final JobHandler jh = new JobHandler(job, this.configuration);
+ final JobHandler jh = new JobHandler(job, null, this.configuration);
jh.finished(Job.JobState.DROPPED, true, -1);
}
}
@@ -855,7 +855,7 @@ public class JobManagerImpl
if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
// get the queue configuration
final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
- final AbstractJobQueue queue = (AbstractJobQueue)this.qManager.getQueue(queueInfo.queueName);
+ final JobQueueImpl queue = (JobQueueImpl)this.qManager.getQueue(queueInfo.queueName);
boolean stopped = false;
if ( queue != null ) {
@@ -863,7 +863,7 @@ public class JobManagerImpl
}
if ( forward && !stopped ) {
// mark the job as stopped
- final JobHandler jh = new JobHandler(job,this.configuration);
+ final JobHandler jh = new JobHandler(job, null, this.configuration);
jh.finished(JobState.STOPPED, true, -1);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java Mon Mar 16 16:07:08 2015
@@ -18,31 +18,22 @@
*/
package org.apache.sling.event.impl.jobs.deprecated;
-import org.osgi.service.event.Event;
+import org.apache.sling.event.jobs.JobProcessor;
public interface JobStatusNotifier {
String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
- class NotifierContext {
- private final JobStatusNotifier notifier;
-
- public NotifierContext(final JobStatusNotifier n) {
- this.notifier = n;
- }
-
- public JobStatusNotifier getJobStatusNotifier() {
- return this.notifier;
- }
- }
-
/**
- * Send an acknowledge message that someone is processing the job.
- * @param job The job.
+ * Notify the job handling that the job has been ack'ed.
+ * If a processor is set, the job queue will use that processor to execute the job.
+ * If it is not set, async processing is enabled and {@link #finishedJob(boolean)}
+ * needs to be called by the caller of this method.
+ * @param processor The job processor.
* @return <code>true</code> if the ack is ok, <code>false</code> otherwise (e.g. if
* someone else already send an ack for this job.
*/
- boolean sendAcknowledge(Event job);
+ boolean getAcknowledge(final JobProcessor processor);
/**
* Notify that the job is finished.
@@ -50,9 +41,8 @@ public interface JobStatusNotifier {
* during the processing. If the job should be rescheduled, <code>true</code> indicates
* that the job could be rescheduled. If an error occurs or the number of retries is
* exceeded, <code>false</code> will be returned.
- * @param job The job.
* @param reschedule Should the event be rescheduled?
* @return <code>true</code> if everything went fine, <code>false</code> otherwise.
*/
- boolean finishedJob(Event job, boolean reschedule);
+ boolean finishedJob(boolean reschedule);
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.deprecated;
+
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+
+public class JobStatusNotifierImpl implements JobStatusNotifier {
+
+ private volatile boolean isCalled = false;
+
+ private volatile boolean isMarked = false;
+
+ private volatile JobProcessor processor;
+
+ private volatile JobExecutionContext context;
+
+ @Override
+ public boolean getAcknowledge(final JobProcessor processor) {
+ synchronized ( this ) {
+ this.isCalled = true;
+ this.processor = processor;
+ return !isMarked;
+ }
+ }
+
+ @Override
+ public boolean finishedJob(final boolean reschedule) {
+ if ( this.context != null ) {
+ this.context.asyncProcessingFinished(reschedule ? context.result().failed() : context.result().succeeded());
+ }
+ return false;
+ }
+
+ public void markDone() {
+ this.isMarked = true;
+ }
+
+ public boolean isCalled() {
+ return this.isCalled;
+ }
+
+ public JobProcessor getProcessor() {
+ return this.processor;
+ }
+
+ public void setJobExecutionContext(final JobExecutionContext context) {
+ this.context = context;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifierImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+
+public class JobExecutionContextImpl implements JobExecutionContext {
+
+ public interface ASyncHandler {
+ void finished(Job.JobState state);
+ }
+
+ private volatile boolean hasInit = false;
+
+ private final JobHandler handler;
+
+ private final Object lock;
+
+ private final AtomicBoolean isAsync;
+
+ private final ASyncHandler asyncHandler;
+
+ public JobExecutionContextImpl(final JobHandler handler,
+ final Object syncLock,
+ final AtomicBoolean isAsync,
+ final ASyncHandler asyncHandler) {
+ this.handler = handler;
+ this.lock = syncLock;
+ this.isAsync = isAsync;
+ this.asyncHandler = asyncHandler;
+ }
+
+ @Override
+ public void initProgress(final int steps,
+ final long eta) {
+ if ( !hasInit ) {
+ handler.persistJobProperties(handler.getJob().startProgress(steps, eta));
+ hasInit = true;
+ }
+ }
+
+ @Override
+ public void incrementProgressCount(final int steps) {
+ if ( hasInit ) {
+ handler.persistJobProperties(handler.getJob().setProgress(steps));
+ }
+ }
+
+ @Override
+ public void updateProgress(final long eta) {
+ if ( hasInit ) {
+ handler.persistJobProperties(handler.getJob().update(eta));
+ }
+ }
+
+ @Override
+ public void log(final String message, Object... args) {
+ handler.persistJobProperties(handler.getJob().log(message, args));
+ }
+
+ @Override
+ public boolean isStopped() {
+ return handler.isStopped();
+ }
+
+ @Override
+ public void asyncProcessingFinished(final JobExecutionResult result) {
+ synchronized ( lock ) {
+ if ( isAsync.compareAndSet(true, false) ) {
+ Job.JobState state = null;
+ if ( result.succeeded() ) {
+ state = Job.JobState.SUCCEEDED;
+ } else if ( result.failed() ) {
+ state = Job.JobState.QUEUED;
+ } else if ( result.cancelled() ) {
+ if ( handler.isStopped() ) {
+ state = Job.JobState.STOPPED;
+ } else {
+ state = Job.JobState.ERROR;
+ }
+ }
+ asyncHandler.finished(state);
+ } else {
+ throw new IllegalStateException("Job is not processed async " + handler.getJob().getId());
+ }
+ }
+ }
+
+ @Override
+ public ResultBuilder result() {
+ return new ResultBuilderImpl();
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobExecutionContextImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EventingThreadPool;
+import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
+import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifierImpl;
+import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
+import org.apache.sling.event.impl.support.BatchResourceRemover;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.Job.JobState;
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration.Type;
+import org.apache.sling.event.jobs.Statistics;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.osgi.service.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public class JobQueueImpl
+ implements Queue {
+
+ /** Default timeout for suspend. */
+ private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
+
+ /** Default number of milliseconds to wait for an ack. */
+ private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
+ /** The logger. */
+ private final Logger logger;
+
+ /** Configuration. */
+ private final InternalQueueConfiguration configuration;
+
+ /** The queue name. */
+ private volatile String queueName;
+
+ /** Are we still running? */
+ private volatile boolean running;
+
+ /** Suspended since. */
+ private final AtomicLong suspendedSince = new AtomicLong(-1);
+
+ /** Services used by the queues. */
+ private final QueueServices services;
+
+ /** The map of events we're processing. */
+ private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
+
+ private final ThreadPool threadPool;
+
+ /** Async counter. */
+ private final AtomicInteger asyncCounter = new AtomicInteger();
+
+ /** Flag for outdated. */
+ private final AtomicBoolean isOutdated = new AtomicBoolean(false);
+
+ /** A marker for closing the queue. */
+ private final AtomicBoolean closeMarker = new AtomicBoolean(false);
+
+ /** The job cache. */
+ private final QueueJobCache cache;
+
+ /** Semaphore for handling the max number of jobs. */
+ private final Semaphore available;
+
+ /**
+ * Create a new queue
+ * @param name The queue name
+ * @param config The queue configuration
+ */
+ public JobQueueImpl(final String name,
+ final InternalQueueConfiguration config,
+ final QueueServices services,
+ final Set<String> topics) {
+ if ( config.getOwnThreadPoolSize() > 0 ) {
+ this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
+ } else {
+ this.threadPool = services.eventingThreadPool;
+ }
+ this.queueName = name;
+ this.configuration = config;
+ this.services = services;
+ this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+ this.running = true;
+ this.cache = new QueueJobCache(services.configuration, config.getType(), topics);
+ this.cache.fillCache();
+ this.available = new Semaphore(config.getMaxParallel(), true);
+ logger.info("Starting job queue {}", queueName);
+ logger.debug("Configuration for job queue={}", configuration);
+ }
+
+ /**
+ * Return the queue configuration
+ */
+ @Override
+ public InternalQueueConfiguration getConfiguration() {
+ return this.configuration;
+ }
+
+ /**
+ * Get the name of the job queue.
+ */
+ @Override
+ public String getName() {
+ return this.queueName;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStatistics()
+ */
+ @Override
+ public Statistics getStatistics() {
+ return this.services.statisticsManager.getQueueStatistics(this.queueName);
+ }
+
+ /**
+ * Start the job queue.
+ */
+ public void start() {
+ start(false);
+ }
+ /**
+ * Start the job queue.
+ * This method might be called concurrently, therefore we synchronize
+ */
+ public synchronized void start(boolean justOne) {
+ // we start as many jobs in parallel as possible
+ while ( this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire() ) {
+ boolean started = false;
+ try {
+ final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this, false);
+ if ( handler != null ) {
+ started = true;
+ this.threadPool.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ // update thread priority and name
+ final Thread currentThread = Thread.currentThread();
+ final String oldName = currentThread.getName();
+ final int oldPriority = currentThread.getPriority();
+
+ currentThread.setName(oldName + "-" + handler.getJob().getQueueName() + "(" + handler.getJob().getTopic() + ")");
+ if ( configuration.getThreadPriority() != null ) {
+ switch ( configuration.getThreadPriority() ) {
+ case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+ break;
+ case MIN : currentThread.setPriority(Thread.MIN_PRIORITY);
+ break;
+ case MAX : currentThread.setPriority(Thread.MAX_PRIORITY);
+ break;
+ }
+ }
+
+ try {
+ startJob(handler);
+ } finally {
+ currentThread.setPriority(oldPriority);
+ currentThread.setName(oldName);
+ }
+ // and try to launch another job
+ start(true);
+ }
+ });
+ if ( justOne ) {
+ break;
+ }
+ } else {
+ // no job available, stop look
+ break;
+ }
+
+ } finally {
+ if ( !started ) {
+ this.available.release();
+ }
+ }
+ }
+ }
+
+ private void startJob(final JobHandler handler) {
+ try {
+ this.closeMarker.set(false);
+ try {
+ final JobImpl job = handler.getJob();
+ handler.started = System.currentTimeMillis();
+
+ if ( handler.getConsumer() != null ) {
+ final long queueTime = handler.started - job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class).getTime().getTime();
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
+ synchronized ( this.processingJobsLists ) {
+ this.processingJobsLists.put(job.getId(), handler);
+ }
+
+ final Object lock = new Object();
+
+ JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
+ Job.JobState resultState = Job.JobState.ERROR;
+ final AtomicBoolean isAsync = new AtomicBoolean(false);
+
+ try {
+ synchronized ( lock ) {
+ final JobExecutionContext ctx = new JobExecutionContextImpl(handler, lock, isAsync, new JobExecutionContextImpl.ASyncHandler() {
+
+ @Override
+ public void finished(final JobState state) {
+ services.jobConsumerManager.unregisterListener(job.getId());
+ finishedJob(job.getId(), state, true);
+ asyncCounter.decrementAndGet();
+ }
+ });
+
+ result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
+ if ( result == null ) { // ASYNC processing
+ services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
+ asyncCounter.incrementAndGet();
+ isAsync.set(true);
+ } else {
+ if ( result.succeeded() ) {
+ resultState = Job.JobState.SUCCEEDED;
+ } else if ( result.failed() ) {
+ resultState = Job.JobState.QUEUED;
+ } else if ( result.cancelled() ) {
+ if ( handler.isStopped() ) {
+ resultState = Job.JobState.STOPPED;
+ } else {
+ resultState = Job.JobState.ERROR;
+ }
+ }
+ }
+ }
+ } catch (final Throwable t) { //NOSONAR
+ logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
+ // we don't reschedule if an exception occurs
+ result = JobExecutionResultImpl.CANCELLED;
+ resultState = Job.JobState.ERROR;
+ } finally {
+ if ( result != null ) {
+ if ( result.getRetryDelayInMs() != null ) {
+ job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
+ }
+ if ( result.getMessage() != null ) {
+ job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
+ }
+ this.finishedJob(job.getId(), resultState, false);
+ }
+ }
+
+ } else {
+ final Event jobEvent = this.getJobEvent(handler);
+ final JobStatusNotifierImpl notifier = (JobStatusNotifierImpl) jobEvent.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+ // we need async delivery, otherwise we might create a deadlock
+ // as this method runs inside a synchronized block and the finishedJob
+ // method as well!
+ final long endOfAck = System.currentTimeMillis() + DEFAULT_WAIT_FOR_ACK_IN_MS;
+ this.services.eventAdmin.postEvent(jobEvent);
+
+ // wait for the ack
+ synchronized ( notifier ) {
+ while ( System.currentTimeMillis() < endOfAck && !notifier.isCalled() ) {
+ try {
+ notifier.wait(endOfAck - System.currentTimeMillis());
+ } catch ( final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ ignoreException(ie);
+ }
+ }
+ if ( !notifier.isCalled() ) {
+ notifier.markDone();
+ }
+ }
+ if ( !notifier.isCalled() ) {
+ if ( handler.reschedule() ) {
+ this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
+ handler.getJob().retry();
+ this.requeue(handler);
+ }
+ } else {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Received ack for job {}", Utility.toString(job));
+ }
+ // check for processor
+ final JobProcessor processor = notifier.getProcessor();
+ if ( processor != null ) {
+ boolean result = false;
+ try {
+ result = processor.process(jobEvent);
+ } catch (Throwable t) { //NOSONAR
+ LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
+ // we don't reschedule if an exception occurs
+ result = true;
+ }
+ if ( result ) {
+ this.finishedJob(job.getId(), Job.JobState.SUCCEEDED, false);
+ } else {
+ this.finishedJob(job.getId(), Job.JobState.QUEUED, false);
+ }
+ } else {
+ // async processing
+ final AtomicBoolean isAsync = new AtomicBoolean(true);
+ final JobExecutionContext ctx = new JobExecutionContextImpl(handler, new Object(), isAsync, new JobExecutionContextImpl.ASyncHandler() {
+
+ @Override
+ public void finished(final JobState state) {
+ services.jobConsumerManager.unregisterListener(job.getId());
+ finishedJob(job.getId(), state, true);
+ asyncCounter.decrementAndGet();
+ }
+ });
+ services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
+ asyncCounter.incrementAndGet();
+
+ notifier.setJobExecutionContext(ctx);
+ }
+ }
+ }
+ } catch (final Exception re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job processing.", re);
+ }
+ } finally {
+ this.available.release();
+ }
+ }
+
+ /**
+ * Outdate this queue.
+ */
+ public void outdate() {
+ if ( this.isOutdated.compareAndSet(false, true) ) {
+ final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
+ this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
+ this.queueName = name;
+ }
+ }
+
+ /**
+ * Check if the queue can be closed
+ */
+ public boolean tryToClose() {
+ // resume the queue as we want to close it!
+ this.resume();
+ // check if possible
+ if ( this.canBeClosed() ) {
+ if ( this.closeMarker.get() ) {
+ this.close();
+ return true;
+ }
+ this.closeMarker.set(true);
+ }
+ return false;
+ }
+
+ /**
+ * Check whether this queue can be closed
+ */
+ public boolean canBeClosed() {
+ return !this.isSuspended()
+ && this.cache.isEmpty()
+ && this.asyncCounter.get() == 0
+ && this.available.availablePermits() == this.configuration.getMaxParallel();
+ }
+
+ /**
+ * Close this queue.
+ */
+ public void close() {
+ this.running = false;
+ this.logger.debug("Shutting down job queue {}", queueName);
+ this.resume();
+
+ synchronized ( this.processingJobsLists ) {
+ this.processingJobsLists.clear();
+ }
+ if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
+ ((EventingThreadPool)this.threadPool).release();
+ }
+
+ this.logger.info("Stopped job queue {}", this.queueName);
+ }
+
+ /**
+ * Periodic maintenance
+ */
+ public void maintain() {
+ // check suspended
+ final long since = this.suspendedSince.get();
+ if ( since != -1 && since + MAX_SUSPEND_TIME < System.currentTimeMillis() ) {
+ logger.info("Waking up suspended queue. It has been suspended for more than {}ms", MAX_SUSPEND_TIME);
+ this.resume();
+ }
+
+ // TODO - set full cache search
+
+ this.start();
+ }
+
+ /**
+ * Inform the queue about new job for the given topics.
+ * @param topics the new topics
+ */
+ public void wakeUpQueue(final Set<String> topics) {
+ this.cache.handleNewTopics(topics);
+ this.start();
+ }
+
+ /**
+ * Put a job back in the queue
+ * @param handler The job handler
+ */
+ private void requeue(final JobHandler handler) {
+ this.cache.reschedule(handler);
+ this.start();
+ }
+
+ private static final class RescheduleInfo {
+ public boolean reschedule = false;
+ public long processingTime;
+ }
+
+ private RescheduleInfo handleReschedule(final JobHandler handler, final Job.JobState resultState) {
+ final RescheduleInfo info = new RescheduleInfo();
+ switch ( resultState ) {
+ case SUCCEEDED : // job is finished
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
+ }
+ info.processingTime = System.currentTimeMillis() - handler.started;
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
+ break;
+ case QUEUED : // check if we exceeded the number of retries
+ final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
+ int retryCount = (Integer)handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+
+ retryCount++;
+ if ( retries != -1 && retryCount > retries ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
+ }
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+ } else {
+ info.reschedule = true;
+ handler.getJob().retry();
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
+ }
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
+ }
+ break;
+ default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
+ }
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
+ break;
+ }
+
+ return info;
+ }
+
+ /**
+ * Handle job finish and determine whether to reschedule or cancel the job
+ */
+ private boolean finishedJob(final String jobId,
+ Job.JobState resultState,
+ final boolean isAsync) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Received finish for job {}, resultState={}", jobId, resultState);
+ }
+
+ // get job handler
+ final JobHandler handler;
+ // let's remove the event from our processing list
+ synchronized ( this.processingJobsLists ) {
+ handler = this.processingJobsLists.remove(jobId);
+ }
+
+ if ( !this.running ) {
+ this.logger.warn("Queue is not running anymore. Discarding finish for {}", jobId);
+ return false;
+ }
+
+ if ( handler == null ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("This job has never been started by this queue: {}", jobId);
+ }
+ return false;
+ }
+
+ // handle the reschedule, a new job might be returned with updated reschedule info!
+ final RescheduleInfo rescheduleInfo = this.handleReschedule(handler, resultState);
+ if ( resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule ) {
+ resultState = Job.JobState.GIVEN_UP;
+ }
+
+ if ( !rescheduleInfo.reschedule ) {
+ // we keep cancelled jobs and succeeded jobs if the queue is configured like this.
+ final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
+ handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
+ } else {
+ this.reschedule(handler);
+ }
+
+ return rescheduleInfo.reschedule;
+ }
+
+ /**
+ * Create the real job event.
+ * This generates a new event object with the same properties, but with the
+ * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+ * @param info The job event.
+ * @return The real job event.
+ */
+ private Event getJobEvent(final JobHandler info) {
+ final String eventTopic = info.getJob().getTopic();
+ final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+ for(final String name : info.getJob().getPropertyNames()) {
+ properties.put(name, info.getJob().getProperty(name));
+ }
+
+ // put properties for finished job callback
+ properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME, new JobStatusNotifierImpl());
+
+ // remove app id and distributable flag
+ properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
+ properties.remove(EventUtil.PROPERTY_APPLICATION);
+
+ return new Event(eventTopic, properties);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#resume()
+ */
+ @Override
+ public void resume() {
+ if ( this.suspendedSince.getAndSet(-1) != -1 ) {
+ this.logger.debug("Waking up suspended queue {}", queueName);
+ this.start();
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#suspend()
+ */
+ @Override
+ public void suspend() {
+ if ( this.suspendedSince.compareAndSet(-1, System.currentTimeMillis()) ) {
+ this.logger.debug("Suspending queue {}", queueName);
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#isSuspended()
+ */
+ @Override
+ public boolean isSuspended() {
+ return this.suspendedSince.get() != -1;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#removeAll()
+ */
+ @Override
+ public synchronized void removeAll() {
+ final Set<String> topics = this.cache.getTopics();
+ logger.debug("Removing all jobs for queue {} : {}", queueName, topics);
+
+ if ( !topics.isEmpty() ) {
+
+ final ResourceResolver resolver = this.services.configuration.createResourceResolver();
+ try {
+ final Resource baseResource = resolver.getResource(this.services.configuration.getLocalJobsPath());
+
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final BatchResourceRemover brr = new BatchResourceRemover();
+
+ for(final String t : topics) {
+ final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
+ if ( topicResource != null ) {
+ JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
+
+ @Override
+ public boolean handle(final JobImpl job) {
+ final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+ // sanity check
+ if ( jobResource != null ) {
+ try {
+ brr.delete(jobResource);
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to remove job " + job, ignore);
+ topicResource.getResourceResolver().revert();
+ topicResource.getResourceResolver().refresh();
+ }
+ }
+ return true;
+ }
+ });
+ }
+ }
+ try {
+ resolver.commit();
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to remove jobs", ignore);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ @Override
+ public void clear() {
+ // this is a noop
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
+ */
+ @Override
+ public Object getState(final String key) {
+ // not supported for now
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+ */
+ @Override
+ public String getStateInfo() {
+ return "outdated=" + this.isOutdated.get() +
+ ", suspendedSince=" + this.suspendedSince.get() +
+ ", asyncJobs=" + this.asyncCounter.get() +
+ ", jobCount=" + String.valueOf(this.configuration.getMaxParallel() - this.available.availablePermits());
+ }
+
+ /**
+ * Get the retry delay for a job.
+ * @param handler The job handler.
+ * @return The retry delay
+ */
+ private long getRetryDelay(final JobHandler handler) {
+ long delay = this.configuration.getRetryDelayInMs();
+ if ( handler.getJob().getProperty(JobImpl.PROPERTY_DELAY_OVERRIDE) != null ) {
+ delay = handler.getJob().getProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, Long.class);
+ } else if ( handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ delay = handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY, Long.class);
+ }
+ return delay;
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ public boolean stopJob(final JobImpl job) {
+ final JobHandler handler;
+ synchronized ( this.processingJobsLists ) {
+ handler = this.processingJobsLists.get(job.getId());
+ }
+ if ( handler != null ) {
+ handler.stop();
+ }
+ return handler != null;
+ }
+
+ private void reschedule(final JobHandler handler) {
+ // we delay putting back the job until the retry delay is over
+ final long delay = this.getRetryDelay(handler);
+ if ( delay > 0 ) {
+ if ( this.configuration.getType() == Type.ORDERED ) {
+ this.suspend();
+ }
+ handler.addToRetryList();
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
+ final Runnable t = new Runnable() {
+ @Override
+ public void run() {
+ if ( handler.removeFromRetryList() ) {
+ requeue(handler);
+ }
+ if ( configuration.getType() == Type.ORDERED ) {
+ resume();
+ }
+ }
+ };
+ services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName));
+ } else {
+ // put directly into queue
+ this.requeue(handler);
+ }
+ }
+}
+
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Mon Mar 16 16:07:08 2015
@@ -30,12 +30,16 @@ import java.util.concurrent.atomic.Atomi
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.QueueConfiguration.Type;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,34 +127,59 @@ public class QueueJobCache {
/**
* Get the next job.
- * This method is not called concurrently, however
+ * This method is potentially called concurrently, and
* {@link #reschedule(JobHandler)} and {@link #handleNewTopics(Set)}
* can be called concurrently.
*/
- public JobImpl getNextJob(final boolean doFull) {
- JobImpl result = null;
+ public JobHandler getNextJob(final JobConsumerManager jobConsumerManager,
+ final Queue queue,
+ final boolean doFull) {
+ JobHandler handler = null;
synchronized ( this.cache ) {
- if ( this.cache.isEmpty() ) {
- final Set<String> checkingTopics = new HashSet<String>();
- synchronized ( this.topicsWithNewJobs ) {
- checkingTopics.addAll(this.topicsWithNewJobs);
- this.topicsWithNewJobs.clear();
- }
- if ( doFull ) {
- checkingTopics.addAll(this.topics);
- }
- if ( !checkingTopics.isEmpty() ) {
- this.loadJobs(checkingTopics);
+ boolean retry;
+ do {
+ retry = false;
+ if ( this.cache.isEmpty() ) {
+ final Set<String> checkingTopics = new HashSet<String>();
+ synchronized ( this.topicsWithNewJobs ) {
+ checkingTopics.addAll(this.topicsWithNewJobs);
+ this.topicsWithNewJobs.clear();
+ }
+ if ( doFull ) {
+ checkingTopics.addAll(this.topics);
+ }
+ if ( !checkingTopics.isEmpty() ) {
+ this.loadJobs(checkingTopics);
+ }
}
- }
- if ( !this.cache.isEmpty() ) {
- result = this.cache.remove(0);
- }
+ if ( !this.cache.isEmpty() ) {
+ final JobImpl job = this.cache.remove(0);
+ final JobExecutor consumer = jobConsumerManager.getExecutor(job.getTopic());
+
+ handler = new JobHandler(job, consumer, this.configuration);
+ if ( (consumer != null || (job.isBridgedEvent() && jobConsumerManager.supportsBridgedEvents())) ) {
+ if ( !handler.startProcessing(queue) ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Discarding removed job {}", Utility.toString(job));
+ }
+ handler = null;
+ retry = true;
+ }
+ } else {
+ // no consumer on this instance, assign to another instance
+ handler.reassign();
+
+ handler = null;
+ retry = true;
+ }
+
+ }
+ } while ( handler == null && retry);
}
- return result;
+ return handler;
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Mon Mar 16 16:07:08 2015
@@ -39,7 +39,9 @@ import org.apache.felix.scr.annotations.
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
@@ -56,7 +58,6 @@ import org.apache.sling.event.impl.suppo
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
-import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -67,13 +68,13 @@ import org.slf4j.LoggerFactory;
/**
- * Implementation of the job manager.
+ * Implementation of the queue manager.
*/
@Component(immediate=true)
@Service(value={Runnable.class, QueueManager.class, EventHandler.class})
@Properties({
- @Property(name="scheduler.period", longValue=60),
- @Property(name="scheduler.concurrent", boolValue=false),
+ @Property(name=Scheduler.PROPERTY_SCHEDULER_PERIOD, longValue=60),
+ @Property(name=Scheduler.PROPERTY_SCHEDULER_CONCURRENT, boolValue=false),
@Property(name=EventConstants.EVENT_TOPIC, value=NotificationConstants.TOPIC_JOB_ADDED)
})
public class QueueManager
@@ -97,6 +98,12 @@ public class QueueManager
@Reference
private ThreadPoolManager threadPoolManager;
+ /**
+ * Our thread pool.
+ */
+ @Reference(referenceInterface=EventingThreadPool.class)
+ private ThreadPool threadPool;
+
/** The job manager configuration. */
@Reference
private JobManagerConfiguration configuration;
@@ -108,7 +115,7 @@ public class QueueManager
private final Object queuesLock = new Object();
/** All active queues. */
- private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+ private final Map<String, JobQueueImpl> queues = new ConcurrentHashMap<String, JobQueueImpl>();
/** We count the scheduler runs. */
private volatile long schedulerRuns;
@@ -133,6 +140,7 @@ public class QueueManager
queueServices.scheduler = this.scheduler;
queueServices.threadPoolManager = this.threadPoolManager;
queueServices.statisticsManager = statisticsManager;
+ queueServices.eventingThreadPool = this.threadPool;
this.configuration.addListener(this);
}
@@ -144,9 +152,9 @@ public class QueueManager
logger.debug("Apache Sling Queue Manager stopping on instance {}", Environment.APPLICATION_ID);
this.configuration.removeListener(this);
- final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
+ final Iterator<JobQueueImpl> i = this.queues.values().iterator();
while ( i.hasNext() ) {
- final AbstractJobQueue jbq = i.next();
+ final JobQueueImpl jbq = i.next();
jbq.close();
// update mbeans
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
@@ -166,10 +174,9 @@ public class QueueManager
this.schedulerRuns++;
logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
- // check for unprocessed jobs first
- logger.debug("Checking for unprocessed jobs...");
- for(final AbstractJobQueue jbq : this.queues.values() ) {
- jbq.checkForUnprocessedJobs();
+ // queue maintenance
+ for(final JobQueueImpl jbq : this.queues.values() ) {
+ jbq.maintain();
}
// we only do a full clean up on every fifth run
@@ -181,10 +188,10 @@ public class QueueManager
// we synchronize to avoid creating a queue which is about to be removed during cleanup
synchronized ( queuesLock ) {
- final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
+ final Iterator<Map.Entry<String, JobQueueImpl>> i = this.queues.entrySet().iterator();
while ( i.hasNext() ) {
- final Map.Entry<String, AbstractJobQueue> current = i.next();
- final AbstractJobQueue jbq = current.getValue();
+ final Map.Entry<String, JobQueueImpl> current = i.next();
+ final JobQueueImpl jbq = current.getValue();
if ( jbq.tryToClose() ) {
logger.debug("Removing idle job queue {}", jbq);
// remove
@@ -210,7 +217,7 @@ public class QueueManager
final Set<String> topics) {
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// get or create queue
- AbstractJobQueue queue = null;
+ JobQueueImpl queue = null;
// we synchronize to avoid creating a queue which is about to be removed during cleanup
synchronized ( queuesLock ) {
queue = this.queues.get(queueInfo.queueName);
@@ -221,23 +228,12 @@ public class QueueManager
queue = null;
}
if ( queue == null ) {
- if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
- queue = new OrderedJobQueue(queueInfo.queueName, config, queueServices, topics);
- } else if ( config.getType() == QueueConfiguration.Type.UNORDERED
- || config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
- queue = new ParallelJobQueue(queueInfo.queueName, config, queueServices, topics);
- }
- // this is just a sanity check, actually we always have a queue instance here
- if ( queue != null ) {
- // on startup the queue might be empty and we can simply discard it
- if ( !queue.canBeClosed() ) {
- queues.put(queueInfo.queueName, queue);
- ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
- queue.start();
- }
- } else {
- // we log anyway
- logger.error("Unable to create new queue: unknown queue type {}", config);
+ queue = new JobQueueImpl(queueInfo.queueName, config, queueServices, topics);
+ // on startup the queue might be empty and we can simply discard it
+ if ( !queue.canBeClosed() ) {
+ queues.put(queueInfo.queueName, queue);
+ ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
+ queue.start();
}
} else {
queue.wakeUpQueue(topics);
@@ -255,7 +251,7 @@ public class QueueManager
this.maintain();
}
- private void outdateQueue(final AbstractJobQueue queue) {
+ private void outdateQueue(final JobQueueImpl queue) {
// remove the queue with the old name
// check for main queue
final String oldName = ResourceHelper.filterQueueName(queue.getName());
@@ -285,8 +281,8 @@ public class QueueManager
private void restart() {
// let's rename/close all queues and clear them
synchronized ( queuesLock ) {
- final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
- for(final AbstractJobQueue queue : queues ) {
+ final List<JobQueueImpl> queues = new ArrayList<JobQueueImpl>(this.queues.values());
+ for(final JobQueueImpl queue : queues ) {
this.outdateQueue(queue);
}
}
@@ -295,7 +291,7 @@ public class QueueManager
if ( config != null ) {
final List<Job> rescheduleList = this.configuration.clearJobRetryList();
for(final Job j : rescheduleList) {
- final JobHandler jh = new JobHandler((JobImpl)j, this.configuration);
+ final JobHandler jh = new JobHandler((JobImpl)j, null, this.configuration);
jh.reschedule();
}
}
@@ -312,7 +308,7 @@ public class QueueManager
* @see org.apache.sling.event.jobs.JobManager#getQueues()
*/
public Iterable<Queue> getQueues() {
- final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
+ final Iterator<JobQueueImpl> jqI = this.queues.values().iterator();
return new Iterable<Queue>() {
@Override
@@ -418,4 +414,14 @@ public class QueueManager
this.logger.debug("Established new topic mapping: {}", mapping);
return mapping;
}
+
+ protected void bindThreadPool(final EventingThreadPool etp) {
+ this.threadPool = etp;
+ }
+
+ protected void unbindThreadPool(final EventingThreadPool etp) {
+ if ( this.threadPool == etp ) {
+ this.threadPool = null;
+ }
+ }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Mon Mar 16 16:07:08 2015
@@ -19,12 +19,18 @@
package org.apache.sling.event.impl.jobs.queues;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.osgi.service.event.EventAdmin;
+/**
+ * The queue services class is a helper class containing all
+ * services used by the queue implementations.
+ * This avoids passing a set of separate objects.
+ */
public class QueueServices {
public JobManagerConfiguration configuration;
@@ -38,4 +44,6 @@ public class QueueServices {
public Scheduler scheduler;
public StatisticsManager statisticsManager;
+
+ public ThreadPool eventingThreadPool;
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java?rev=1667052&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java Mon Mar 16 16:07:08 2015
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import org.apache.sling.event.impl.jobs.InternalJobState;
+import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext.ResultBuilder;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+
+public class ResultBuilderImpl implements ResultBuilder {
+
+ private String message;
+
+ private Long retryDelayInMs;
+
+ @Override
+ public JobExecutionResult failed(final long retryDelayInMs) {
+ this.retryDelayInMs = retryDelayInMs;
+ return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+ }
+
+ @Override
+ public ResultBuilder message(final String message) {
+ this.message = message;
+ return this;
+ }
+
+ @Override
+ public JobExecutionResult succeeded() {
+ return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs);
+ }
+
+ @Override
+ public JobExecutionResult failed() {
+ return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+ }
+
+ @Override
+ public JobExecutionResult cancelled() {
+ return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs);
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ResultBuilderImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1667052&r1=1667051&r2=1667052&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java Mon Mar 16 16:07:08 2015
@@ -253,14 +253,14 @@ public abstract class JobUtil {
* @deprecated - Use the new {@link JobConsumer} interface instead.
*/
@Deprecated
- private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job) {
+ private static JobStatusNotifier getNotifier(final Event job) {
// check if this is a job event
if ( !isJobEvent(job) ) {
return null;
}
- final JobStatusNotifier.NotifierContext ctx = (JobStatusNotifier.NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+ final JobStatusNotifier ctx = (JobStatusNotifier) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
if ( ctx == null ) {
- throw new IllegalArgumentException("JobStatusNotifier context is not available in event properties.");
+ throw new IllegalArgumentException("JobStatusNotifier is not available in event properties.");
}
return ctx;
}
@@ -277,9 +277,9 @@ public abstract class JobUtil {
*/
@Deprecated
public static boolean acknowledgeJob(final Event job) {
- final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ final JobStatusNotifier ctx = getNotifier(job);
if ( ctx != null ) {
- if ( !ctx.getJobStatusNotifier().sendAcknowledge(job) ) {
+ if ( !ctx.getAcknowledge(null) ) {
// if we don't get an ack, someone else is already processing this job.
// we process but do not notify the job event handler.
LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
@@ -297,9 +297,9 @@ public abstract class JobUtil {
*/
@Deprecated
public static void finishedJob(final Event job) {
- final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ final JobStatusNotifier ctx = getNotifier(job);
if ( ctx != null ) {
- ctx.getJobStatusNotifier().finishedJob(job, false);
+ ctx.finishedJob(false);
}
}
@@ -311,9 +311,9 @@ public abstract class JobUtil {
*/
@Deprecated
public static boolean rescheduleJob(final Event job) {
- final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ final JobStatusNotifier ctx = getNotifier(job);
if ( ctx != null ) {
- return ctx.getJobStatusNotifier().finishedJob(job, true);
+ return ctx.finishedJob(true);
}
return false;
}
@@ -327,16 +327,17 @@ public abstract class JobUtil {
@Deprecated
public static void processJob(final Event job, final JobProcessor processor) {
// first check for a notifier context to send an acknowledge
- final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
- boolean notify = ctx != null;
- if ( ctx != null && !ctx.getJobStatusNotifier().sendAcknowledge(job) ) {
+ final JobStatusNotifier ctx = getNotifier(job);
+ if ( ctx != null && !ctx.getAcknowledge(processor) ) {
// if we don't get an ack, someone else is already processing this job.
- // we process but do not notify the job event handler.
+ // we do not process.
LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
- notify = false;
+ return;
+ }
+ if ( ctx != null ) {
+ return;
}
final JobPriority priority = (JobPriority) job.getProperty(PROPERTY_JOB_PRIORITY);
- final boolean notifyResult = notify;
final Runnable task = new Runnable() {
@@ -361,23 +362,13 @@ public abstract class JobUtil {
break;
}
}
- boolean result = false;
try {
- result = processor.process(job);
+ processor.process(job);
} catch (Throwable t) { //NOSONAR
LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
- // we don't reschedule if an exception occurs
- result = true;
} finally {
currentThread.setPriority(oldPriority);
currentThread.setName(oldName);
- if ( notifyResult ) {
- if ( result ) {
- JobUtil.finishedJob(job);
- } else {
- JobUtil.rescheduleJob(job);
- }
- }
}
}