You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/15 19:55:53 UTC
svn commit: r1632141 [2/3] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/impl/jobs/
src/main/java/org/apache/sling/event/impl/jobs/config/
src/main/java/org/apache/sling/event/impl/jobs/queues/
src/main/java/org/ap...
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Oct 15 17:55:52 2014
@@ -19,7 +19,6 @@
package org.apache.sling.event.impl.jobs.queues;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
@@ -30,18 +29,16 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.commons.threads.ThreadPool;
-import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.InternalJobState;
-import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
-import org.apache.sling.event.impl.jobs.stats.StatisticsImpl;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
@@ -52,7 +49,6 @@ import org.apache.sling.event.jobs.consu
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,97 +57,97 @@ import org.slf4j.LoggerFactory;
* functionality for the job event handling.
*/
public abstract class AbstractJobQueue
- extends StatisticsImpl
- implements JobStatusNotifier, Queue {
-
- /** Default number of seconds to wait for an ack. */
- private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+ implements Queue, JobStatusNotifier {
/** Default timeout for suspend. */
private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
+ /** Default number of seconds to wait for an ack. */
+ private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
/** The logger. */
protected final Logger logger;
/** Configuration. */
protected final InternalQueueConfiguration configuration;
- /** The event admin. */
- private final EventAdmin eventAdmin;
-
- /** The job consumer manager. */
- private final JobConsumerManager jobConsumerManager;
-
/** The queue name. */
protected volatile String queueName;
/** Are we still running? */
protected volatile boolean running;
- /** Is the queue currently waiting(sleeping) */
- protected volatile boolean isWaiting = false;
-
- /** The map of events we're have started (send). */
- private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>();
-
- /** The map of events we're processing. */
- private final Map<String, JobHandler> processsingJobsLists = new HashMap<String, JobHandler>();
-
/** Suspended since. */
private volatile long suspendedSince = -1L;
/** Suspend lock. */
private final Object suspendLock = new Object();
+ /** Services used by the queues. */
+ protected final QueueServices services;
+
+ /** The map of events we're processing. */
+ private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
+
+ private final ThreadPool threadPool;
+
+ /** The map of events we're have started (send). */
+ private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>();
+
/** Async counter. */
private final AtomicInteger asyncCounter = new AtomicInteger();
+ /** Is the queue currently waiting(sleeping) */
+ protected volatile boolean isWaiting = false;
+
/** Flag for outdated. */
private final AtomicBoolean isOutdated = new AtomicBoolean(false);
- /** Marker flag if the queue is waiting for another element (= empty) */
- protected boolean isWaitingForNext = false;
-
/** A marker for closing the queue. */
private final AtomicBoolean closeMarker = new AtomicBoolean(false);
- private final ThreadPool threadPool;
-
/**
- * Start this queue
+ * Create a new queue
* @param name The queue name
* @param config The queue configuration
- * @param environment The environment component
*/
public AbstractJobQueue(final String name,
- final InternalQueueConfiguration config,
- final JobConsumerManager jobConsumerManager,
- final ThreadPoolManager threadPoolManager,
- final EventAdmin eventAdmin) {
+ final InternalQueueConfiguration config,
+ final QueueServices services) {
if ( config.getOwnThreadPoolSize() > 0 ) {
- this.threadPool = new EventingThreadPool(threadPoolManager, config.getOwnThreadPoolSize());
+ this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
} else {
this.threadPool = Environment.THREAD_POOL;
}
this.queueName = name;
this.configuration = config;
- this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+ this.services = services;
+ this.logger = new TestLogger(LoggerFactory.getLogger(this.getClass().getName() + '.' + name));
this.running = true;
- this.eventAdmin = eventAdmin;
- this.jobConsumerManager = jobConsumerManager;
}
/**
- * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+ * Return the queue configuration
*/
@Override
- public String getStateInfo() {
- synchronized ( this.suspendLock ) {
- return "isWaiting=" + this.isWaiting +
- ", suspendedSince=" + this.suspendedSince +
- ", isWaitingForNext=" + this.isWaitingForNext +
- ", asyncJobs=" + this.asyncCounter.get();
- }
+ public InternalQueueConfiguration getConfiguration() {
+ return this.configuration;
+ }
+
+ /**
+ * Get the name of the job queue.
+ */
+ @Override
+ public String getName() {
+ return this.queueName;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStatistics()
+ */
+ @Override
+ public Statistics getStatistics() {
+ return this.services.statisticsManager.getQueueStatistics(this.queueName);
}
/**
@@ -180,37 +176,22 @@ public abstract class AbstractJobQueue
}
/**
- * Return the queue configuration
+ * Is the queue outdated?
*/
- @Override
- public InternalQueueConfiguration getConfiguration() {
- return this.configuration;
+ protected boolean isOutdated() {
+ return this.isOutdated.get();
}
/**
- * Close this queue.
+ * Outdate this queue.
*/
- public void close() {
- this.running = false;
- this.logger.debug("Shutting down job queue {}", queueName);
- this.resume();
- if ( this.isWaiting ) {
- this.logger.debug("Waking up waiting queue {}", this.queueName);
- this.notifyFinished(null);
- }
- // continue queue processing to stop the queue
- this.put(new JobHandler(null, null));
-
- synchronized ( this.processsingJobsLists ) {
- this.processsingJobsLists.clear();
- }
- synchronized ( this.startedJobsLists ) {
- this.startedJobsLists.clear();
- }
- if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
- ((EventingThreadPool)this.threadPool).release();
+ public void outdate() {
+ if ( !this.isOutdated() ) {
+ this.isOutdated.set(true);
+ final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
+ this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
+ this.queueName = name;
}
- this.logger.info("Stopped job queue {}", this.queueName);
}
/**
@@ -234,7 +215,34 @@ public abstract class AbstractJobQueue
* Check whether this queue can be closed
*/
protected boolean canBeClosed() {
- return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0 && this.isWaitingForNext;
+ return !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
+ }
+
+ /**
+ * Close this queue.
+ */
+ public void close() {
+ this.running = false;
+ this.logger.debug("Shutting down job queue {}", queueName);
+ this.resume();
+ if ( this.isWaiting ) {
+ this.logger.debug("Waking up waiting queue {}", this.queueName);
+ this.notifyFinished(false);
+ }
+ // continue queue processing to stop the queue
+ this.services.topicManager.stop(this.getName());
+
+ synchronized ( this.processingJobsLists ) {
+ this.processingJobsLists.clear();
+ }
+ synchronized ( this.startedJobsLists ) {
+ this.startedJobsLists.clear();
+ }
+ if ( this.configuration.getOwnThreadPoolSize() > 0 ) {
+ ((EventingThreadPool)this.threadPool).release();
+ }
+
+ this.logger.info("Stopped job queue {}", this.queueName);
}
/**
@@ -275,12 +283,10 @@ public abstract class AbstractJobQueue
process = this.startedJobsLists.remove(info.getJob().getId()) != null;
}
if ( process ) {
- if ( !info.reschedule() ) {
- this.decQueued();
- checkForNotify(null);
- } else {
+ if ( info.reschedule() ) {
this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(info.getJob()), info.getJob().getId());
- checkForNotify(info);
+ this.reschedule(info);
+ this.notifyFinished(true);
}
}
}
@@ -288,30 +294,288 @@ public abstract class AbstractJobQueue
}
/**
- * @see org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
+ * Execute the queue
*/
- @Override
- public boolean sendAcknowledge(final Event job) {
- final String jobId = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID);
- final JobHandler ack;
- synchronized ( this.startedJobsLists ) {
- ack = this.startedJobsLists.remove(jobId);
+ private void runJobQueue() {
+ while ( this.running ) {
+ JobHandler info = null;
+ if ( info == null ) {
+ // so let's wait/get the next job from the queue
+ info = this.take();
+ }
+
+ // if we're suspended we drop the current item
+ if ( this.running && info != null && !checkSuspended() ) {
+ // if we still have a job and are running, let's go
+ this.start(info);
+ }
}
- // if the event is still in the processing list, we confirm the ack
- if ( ack != null ) {
+ }
+
+ private JobHandler take() {
+ return this.services.topicManager.take(this.getName());
+ }
+
+ /**
+ * Check if the queue is suspended and go into suspend mode
+ */
+ private boolean checkSuspended() {
+ boolean wasSuspended = false;
+ synchronized ( this.suspendLock ) {
+ while ( this.suspendedSince != -1 ) {
+ logger.debug("Sleeping as queue {} is suspended.", this.getName());
+ wasSuspended = true;
+ final long diff = System.currentTimeMillis() - this.suspendedSince;
+ try {
+ this.suspendLock.wait(MAX_SUSPEND_TIME - diff);
+ } catch (final InterruptedException ignore) {
+ this.ignoreException(ignore);
+ Thread.currentThread().interrupt();
+ }
+ logger.debug("Waking up queue {}.", this.getName());
+ if ( System.currentTimeMillis() > this.suspendedSince + MAX_SUSPEND_TIME ) {
+ this.resume();
+ }
+ }
+ }
+ return wasSuspended;
+ }
+
+ /**
+ * Execute a job
+ */
+ protected boolean executeJob(final JobHandler handler) {
+ final JobImpl job = handler.getJob();
+ final JobExecutor consumer = this.services.jobConsumerManager.getExecutor(job.getTopic());
+
+ if ( (consumer != null || (job.isBridgedEvent() && this.services.jobConsumerManager.supportsBridgedEvents())) ) {
+ final boolean success = this.startJobExecution(handler, consumer);
+ return success;
+ } else {
+ // no consumer on this instance, assign to another instance
+ handler.reassign();
+ return false;
+ }
+ }
+
+ private boolean startJobExecution(final JobHandler handler, final JobExecutor consumer) {
+ final JobImpl job = handler.getJob();
+ if ( handler.startProcessing(this) ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Received ack for job {}", Utility.toString(ack.getJob()));
+ logger.debug("Starting job {}", Utility.toString(job));
}
- final long queueTime = ack.started - ack.queued;
- this.addActive(queueTime);
- Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
- synchronized ( this.processsingJobsLists ) {
- this.processsingJobsLists.put(jobId, ack);
+ try {
+ handler.started = System.currentTimeMillis();
+
+ if ( consumer != null ) {
+ final long queueTime = handler.started - handler.queued;
+ Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
+ synchronized ( this.processingJobsLists ) {
+ this.processingJobsLists.put(job.getId(), handler);
+ }
+
+ final Runnable task = new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ final Object lock = new Object();
+ final Thread currentThread = Thread.currentThread();
+ // update priority and name
+ final String oldName = currentThread.getName();
+ final int oldPriority = currentThread.getPriority();
+
+ currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
+ if ( configuration.getThreadPriority() != null ) {
+ switch ( configuration.getThreadPriority() ) {
+ case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+ break;
+ case MIN : currentThread.setPriority(Thread.MIN_PRIORITY);
+ break;
+ case MAX : currentThread.setPriority(Thread.MAX_PRIORITY);
+ break;
+ }
+ }
+ JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
+ Job.JobState resultState = Job.JobState.ERROR;
+ final AtomicBoolean isAsync = new AtomicBoolean(false);
+
+ try {
+ synchronized ( lock ) {
+ final JobExecutionContext ctx = new JobExecutionContext() {
+
+ private boolean hasInit = false;
+
+ @Override
+ public void initProgress(final int steps,
+ final long eta) {
+ if ( !hasInit ) {
+ handler.persistJobProperties(job.startProgress(steps, eta));
+ hasInit = true;
+ }
+ }
+
+ @Override
+ public void incrementProgressCount(final int steps) {
+ if ( hasInit ) {
+ handler.persistJobProperties(job.setProgress(steps));
+ }
+ }
+
+ @Override
+ public void updateProgress(final long eta) {
+ if ( hasInit ) {
+ handler.persistJobProperties(job.update(eta));
+ }
+ }
+
+ @Override
+ public void log(final String message, Object... args) {
+ handler.persistJobProperties(job.log(message, args));
+ }
+
+ @Override
+ public boolean isStopped() {
+ return handler.isStopped();
+ }
+
+ @Override
+ public void asyncProcessingFinished(final JobExecutionResult result) {
+ synchronized ( lock ) {
+ if ( isAsync.compareAndSet(true, false) ) {
+ services.jobConsumerManager.unregisterListener(job.getId());
+ Job.JobState state = null;
+ if ( result.succeeded() ) {
+ state = Job.JobState.SUCCEEDED;
+ } else if ( result.failed() ) {
+ state = Job.JobState.QUEUED;
+ } else if ( result.cancelled() ) {
+ if ( handler.isStopped() ) {
+ state = Job.JobState.STOPPED;
+ } else {
+ state = Job.JobState.ERROR;
+ }
+ }
+ finishedJob(job.getId(), state, true);
+ asyncCounter.decrementAndGet();
+ } else {
+ throw new IllegalStateException("Job is not processed async " + job.getId());
+ }
+ }
+ }
+
+ @Override
+ public ResultBuilder result() {
+ return new ResultBuilder() {
+
+ private String message;
+
+ private Long retryDelayInMs;
+
+ @Override
+ public JobExecutionResult failed(final long retryDelayInMs) {
+ this.retryDelayInMs = retryDelayInMs;
+ return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+ }
+
+ @Override
+ public ResultBuilder message(final String message) {
+ this.message = message;
+ return this;
+ }
+
+ @Override
+ public JobExecutionResult succeeded() {
+ return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs);
+ }
+
+ @Override
+ public JobExecutionResult failed() {
+ return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
+ }
+
+ @Override
+ public JobExecutionResult cancelled() {
+ return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs);
+ }
+ };
+ }
+ };
+ result = (JobExecutionResultImpl)consumer.process(job, ctx);
+ if ( result == null ) { // ASYNC processing
+ services.jobConsumerManager.registerListener(job.getId(), consumer, ctx);
+ asyncCounter.incrementAndGet();
+ isAsync.set(true);
+ } else {
+ if ( result.succeeded() ) {
+ resultState = Job.JobState.SUCCEEDED;
+ } else if ( result.failed() ) {
+ resultState = Job.JobState.QUEUED;
+ } else if ( result.cancelled() ) {
+ if ( handler.isStopped() ) {
+ resultState = Job.JobState.STOPPED;
+ } else {
+ resultState = Job.JobState.ERROR;
+ }
+ }
+ }
+ }
+ } catch (final Throwable t) { //NOSONAR
+ logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
+ // we don't reschedule if an exception occurs
+ result = JobExecutionResultImpl.CANCELLED;
+ resultState = Job.JobState.ERROR;
+ } finally {
+ currentThread.setPriority(oldPriority);
+ currentThread.setName(oldName);
+ if ( result != null ) {
+ if ( result.getRetryDelayInMs() != null ) {
+ job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
+ }
+ if ( result.getMessage() != null ) {
+ job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
+ }
+ finishedJob(job.getId(), resultState, false);
+ }
+ }
+ }
+
+ };
+ // check if the thread pool is available
+ final ThreadPool pool = this.threadPool;
+ if ( pool != null ) {
+ pool.execute(task);
+ } else {
+ // if we don't have a thread pool, we create the thread directly
+ // (this should never happen for jobs, but is a safe fall back)
+ new Thread(task).start();
+ }
+
+ } else {
+ // let's add the event to our started jobs list
+ synchronized ( this.startedJobsLists ) {
+ this.startedJobsLists.put(job.getId(), handler);
+ }
+ final Event jobEvent = this.getJobEvent(handler);
+ // we need async delivery, otherwise we might create a deadlock
+ // as this method runs inside a synchronized block and the finishedJob
+ // method as well!
+ this.services.eventAdmin.postEvent(jobEvent);
+ }
+ return true;
+
+ } catch (final Exception re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job processing.", re);
}
} else {
- this.decQueued();
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Discarding removed job {}", Utility.toString(job));
+ }
}
- return ack != null;
+ return false;
}
private static final class RescheduleInfo {
@@ -319,42 +583,41 @@ public abstract class AbstractJobQueue
public long processingTime;
}
- private RescheduleInfo handleReschedule(final JobHandler jobEvent, final Job.JobState resultState) {
+ private RescheduleInfo handleReschedule(final JobHandler handler, final Job.JobState resultState) {
final RescheduleInfo info = new RescheduleInfo();
switch ( resultState ) {
case SUCCEEDED : // job is finished
if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Finished job {}", Utility.toString(jobEvent.getJob()));
+ this.logger.debug("Finished job {}", Utility.toString(handler.getJob()));
}
- info.processingTime = System.currentTimeMillis() - jobEvent.started;
- this.finishedJob(info.processingTime);
+ info.processingTime = System.currentTimeMillis() - handler.started;
+ Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), info.processingTime);
break;
case QUEUED : // check if we exceeded the number of retries
- int retries = (Integer) jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
- int retryCount = (Integer)jobEvent.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+ final int retries = (Integer) handler.getJob().getProperty(Job.PROPERTY_JOB_RETRIES);
+ int retryCount = (Integer)handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
retryCount++;
if ( retries != -1 && retryCount > retries ) {
if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Cancelled job {}", Utility.toString(jobEvent.getJob()));
+ this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
}
- this.cancelledJob();
+ Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
} else {
info.reschedule = true;
- // update event with retry count and retries
- jobEvent.getJob().retry();
+ this.reschedule(handler);
if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Failed job {}", Utility.toString(jobEvent.getJob()));
+ this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
}
- this.failedJob();
- jobEvent.queued = System.currentTimeMillis();
+ handler.queued = System.currentTimeMillis();
+ Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
}
break;
default : // consumer cancelled the job (STOPPED, GIVEN_UP, ERROR)
if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Cancelled job {}", Utility.toString(jobEvent.getJob()));
+ this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
}
- this.cancelledJob();
+ Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
break;
}
@@ -379,7 +642,6 @@ public abstract class AbstractJobQueue
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Received finish for job {}, resultState={}", jobId, resultState);
}
- // let's remove the event from our processing list
// this is just a sanity check, as usually the job should have been
// removed during sendAcknowledge.
synchronized ( this.startedJobsLists ) {
@@ -388,8 +650,9 @@ public abstract class AbstractJobQueue
// get job handler
final JobHandler handler;
- synchronized ( this.processsingJobsLists ) {
- handler = this.processsingJobsLists.remove(jobId);
+ // let's remove the event from our processing list
+ synchronized ( this.processingJobsLists ) {
+ handler = this.processingJobsLists.remove(jobId);
}
if ( !this.running ) {
@@ -409,338 +672,15 @@ public abstract class AbstractJobQueue
if ( resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule ) {
resultState = Job.JobState.GIVEN_UP;
}
- // if this is set after the synchronized block we have an error
- final boolean finishSuccessful;
if ( !rescheduleInfo.reschedule ) {
// we keep cancelled jobs and succeeded jobs if the queue is configured like this.
final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
- finishSuccessful = true;
- if ( resultState == Job.JobState.SUCCEEDED ) {
- Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FINISHED, handler.getJob(), rescheduleInfo.processingTime);
- } else {
- Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
- }
- } else {
- finishSuccessful = handler.reschedule();
- Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_FAILED, handler.getJob(), null);
- }
-
- if ( !isAsync ) {
- if ( !finishSuccessful || !rescheduleInfo.reschedule ) {
- checkForNotify(null);
- return false;
- }
- checkForNotify(handler);
- } else {
- // async result
- if ( finishSuccessful && rescheduleInfo.reschedule ) {
- final JobHandler reprocessHandler = this.reschedule(handler);
- if ( reprocessHandler != null ) {
- this.put(reprocessHandler);
- }
- }
- }
- return true;
- }
-
- private void checkForNotify(final JobHandler info) {
- JobHandler reprocessInfo = null;
- if ( info != null ) {
- reprocessInfo = this.reschedule(info);
- }
- notifyFinished(reprocessInfo);
- }
-
- /**
- * Get the name of the job queue.
- */
- @Override
- public String getName() {
- return this.queueName;
- }
-
-
- /**
- * Add a new job to the queue.
- */
- public void process(final JobHandler handler) {
- this.closeMarker.set(false);
- handler.queued = System.currentTimeMillis();
- this.incQueued();
- this.put(handler);
- }
-
- /**
- * Check if the queue is suspended and go into suspend mode
- */
- private void checkSuspended() {
- synchronized ( this.suspendLock ) {
- while ( this.suspendedSince != -1 ) {
- try {
- this.suspendLock.wait(MAX_SUSPEND_TIME);
- } catch (final InterruptedException ignore) {
- this.ignoreException(ignore);
- Thread.currentThread().interrupt();
- }
- if ( System.currentTimeMillis() > this.suspendedSince + MAX_SUSPEND_TIME ) {
- this.resume();
- }
- }
}
- }
-
- /**
- * Execute the queue
- */
- private void runJobQueue() {
- JobHandler info = null;
- while ( this.running ) {
- if ( info == null ) {
- // so let's wait/get the next job from the queue
- info = this.take();
- }
+ this.notifyFinished(rescheduleInfo.reschedule);
- if ( this.running ) {
- checkSuspended();
- }
- if ( info != null && this.running ) {
- info = this.start(info);
- }
- }
- }
-
- /**
- * Execute a job
- */
- protected boolean executeJob(final JobHandler handler) {
- final JobImpl job = handler.getJob();
- final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic());
-
- if ( (consumer != null || (job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents())) ) {
- if ( handler.startProcessing(this) ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Starting job {}", Utility.toString(job));
- }
- try {
- handler.started = System.currentTimeMillis();
-
- if ( consumer != null ) {
- final long queueTime = handler.started - handler.queued;
- this.addActive(queueTime);
- Utility.sendNotification(this.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
- synchronized ( this.processsingJobsLists ) {
- this.processsingJobsLists.put(job.getId(), handler);
- }
-
- final Runnable task = new Runnable() {
-
- /**
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- final Object lock = new Object();
- final Thread currentThread = Thread.currentThread();
- // update priority and name
- final String oldName = currentThread.getName();
- final int oldPriority = currentThread.getPriority();
-
- currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
- if ( configuration.getThreadPriority() != null ) {
- switch ( configuration.getThreadPriority() ) {
- case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
- break;
- case MIN : currentThread.setPriority(Thread.MIN_PRIORITY);
- break;
- case MAX : currentThread.setPriority(Thread.MAX_PRIORITY);
- break;
- }
- }
- JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
- Job.JobState resultState = Job.JobState.ERROR;
- final AtomicBoolean isAsync = new AtomicBoolean(false);
-
- try {
- synchronized ( lock ) {
- final JobExecutionContext ctx = new JobExecutionContext() {
-
- private boolean hasInit = false;
-
- @Override
- public void initProgress(final int steps,
- final long eta) {
- if ( !hasInit ) {
- handler.persistJobProperties(job.startProgress(steps, eta));
- hasInit = true;
- }
- }
-
- @Override
- public void incrementProgressCount(final int steps) {
- if ( hasInit ) {
- handler.persistJobProperties(job.setProgress(steps));
- }
- }
-
- @Override
- public void updateProgress(final long eta) {
- if ( hasInit ) {
- handler.persistJobProperties(job.update(eta));
- }
- }
-
- @Override
- public void log(final String message, Object... args) {
- handler.persistJobProperties(job.log(message, args));
- }
-
- @Override
- public boolean isStopped() {
- return handler.isStopped();
- }
-
- @Override
- public void asyncProcessingFinished(final JobExecutionResult result) {
- synchronized ( lock ) {
- if ( isAsync.compareAndSet(true, false) ) {
- jobConsumerManager.unregisterListener(job.getId());
- Job.JobState state = null;
- if ( result.succeeded() ) {
- state = Job.JobState.SUCCEEDED;
- } else if ( result.failed() ) {
- state = Job.JobState.QUEUED;
- } else if ( result.cancelled() ) {
- if ( handler.isStopped() ) {
- state = Job.JobState.STOPPED;
- } else {
- state = Job.JobState.ERROR;
- }
- }
- finishedJob(job.getId(), state, true);
- asyncCounter.decrementAndGet();
- } else {
- throw new IllegalStateException("Job is not processed async " + job.getId());
- }
- }
- }
-
- @Override
- public ResultBuilder result() {
- return new ResultBuilder() {
-
- private String message;
-
- private Long retryDelayInMs;
-
- @Override
- public JobExecutionResult failed(final long retryDelayInMs) {
- this.retryDelayInMs = retryDelayInMs;
- return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
- }
-
- @Override
- public ResultBuilder message(final String message) {
- this.message = message;
- return this;
- }
-
- @Override
- public JobExecutionResult succeeded() {
- return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, message, retryDelayInMs);
- }
-
- @Override
- public JobExecutionResult failed() {
- return new JobExecutionResultImpl(InternalJobState.FAILED, message, retryDelayInMs);
- }
-
- @Override
- public JobExecutionResult cancelled() {
- return new JobExecutionResultImpl(InternalJobState.CANCELLED, message, retryDelayInMs);
- }
- };
- }
- };
- result = (JobExecutionResultImpl)consumer.process(job, ctx);
- if ( result == null ) { // ASYNC processing
- jobConsumerManager.registerListener(job.getId(), consumer, ctx);
- asyncCounter.incrementAndGet();
- notifyFinished(null);
- isAsync.set(true);
- } else {
- if ( result.succeeded() ) {
- resultState = Job.JobState.SUCCEEDED;
- } else if ( result.failed() ) {
- resultState = Job.JobState.QUEUED;
- } else if ( result.cancelled() ) {
- if ( handler.isStopped() ) {
- resultState = Job.JobState.STOPPED;
- } else {
- resultState = Job.JobState.ERROR;
- }
- }
- }
- }
- } catch (final Throwable t) { //NOSONAR
- logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
- // we don't reschedule if an exception occurs
- result = JobExecutionResultImpl.CANCELLED;
- resultState = Job.JobState.ERROR;
- } finally {
- currentThread.setPriority(oldPriority);
- currentThread.setName(oldName);
- if ( result != null ) {
- if ( result.getRetryDelayInMs() != null ) {
- job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
- }
- if ( result.getMessage() != null ) {
- job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
- }
- finishedJob(job.getId(), resultState, false);
- }
- }
- }
-
- };
- // check if the thread pool is available
- final ThreadPool pool = this.threadPool;
- if ( pool != null ) {
- pool.execute(task);
- } else {
- // if we don't have a thread pool, we create the thread directly
- // (this should never happen for jobs, but is a safe fallback)
- new Thread(task).start();
- }
-
- } else {
- // let's add the event to our processing list
- synchronized ( this.startedJobsLists ) {
- this.startedJobsLists.put(job.getId(), handler);
- }
- final Event jobEvent = this.getJobEvent(handler);
- // we need async delivery, otherwise we might create a deadlock
- // as this method runs inside a synchronized block and the finishedJob
- // method as well!
- this.eventAdmin.postEvent(jobEvent);
- }
- return true;
-
- } catch (final Exception re) {
- // if an exception occurs, we just log
- this.logger.error("Exception during job processing.", re);
- }
- } else {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Discarding removed job {}", Utility.toString(job));
- }
- }
- } else {
- handler.reassign();
- }
- this.decQueued();
- return false;
+ return rescheduleInfo.reschedule;
}
/**
@@ -768,40 +708,27 @@ public abstract class AbstractJobQueue
}
/**
- * Helper method which just logs the exception in debug mode.
- * @param e
+ * @see org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
*/
- protected void ignoreException(Exception e) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Ignored exception " + e.getMessage(), e);
+ @Override
+ public boolean sendAcknowledge(final Event job) {
+ final String jobId = (String)job.getProperty(ResourceHelper.PROPERTY_JOB_ID);
+ final JobHandler ack;
+ synchronized ( this.startedJobsLists ) {
+ ack = this.startedJobsLists.remove(jobId);
}
- }
-
- /**
- * Is the queue outdated?
- */
- protected boolean isOutdated() {
- return this.isOutdated.get();
- }
-
- /**
- * Outdate this queue.
- */
- public void outdate() {
- if ( !this.isOutdated() ) {
- this.isOutdated.set(true);
- final String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
- this.logger.info("Outdating queue {}, renaming to {}.", this.queueName, name);
- this.queueName = name;
+ // if the event is still in the started jobs list, we confirm the ack
+ if ( ack != null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Received ack for job {}", Utility.toString(ack.getJob()));
+ }
+ final long queueTime = ack.started - ack.queued;
+ Utility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, ack.getJob(), queueTime);
+ synchronized ( this.processingJobsLists ) {
+ this.processingJobsLists.put(jobId, ack);
+ }
}
- }
-
- /**
- * @see org.apache.sling.event.jobs.Queue#getStatistics()
- */
- @Override
- public Statistics getStatistics() {
- return this;
+ return ack != null;
}
/**
@@ -841,37 +768,12 @@ public abstract class AbstractJobQueue
}
}
-
/**
* @see org.apache.sling.event.jobs.Queue#removeAll()
*/
@Override
public synchronized void removeAll() {
- // we suspend the queue
- final boolean wasSuspended = this.isSuspended();
- this.suspend();
- // we copy all events and remove them in the background
- final Collection<JobHandler> events = this.removeAllJobs();
- this.clearQueued();
- final Thread t = new Thread(new Runnable() {
-
- /**
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- for(final JobHandler job : events) {
- job.cancel();
- Utility.sendNotification(eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, job.getJob(), null);
- }
- }
- }, "Apache Sling Queue RemoveAll Thread for " + this.queueName);
- t.setDaemon(true);
- t.start();
- // start queue again
- if ( !wasSuspended ) {
- this.resume();
- }
+ this.services.topicManager.removeAll(this.getName());
}
/**
@@ -879,7 +781,7 @@ public abstract class AbstractJobQueue
*/
@Override
public void clear() {
- this.clearQueued();
+ // this is a noop
}
/**
@@ -891,6 +793,18 @@ public abstract class AbstractJobQueue
return null;
}
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+ */
+ @Override
+ public String getStateInfo() {
+ synchronized ( this.suspendLock ) {
+ return "isWaiting=" + this.isWaiting +
+ ", suspendedSince=" + this.suspendedSince +
+ ", asyncJobs=" + this.asyncCounter.get();
+ }
+ }
+
protected long getRetryDelay(final JobHandler handler) {
long delay = this.configuration.getRetryDelayInMs();
if ( handler.getJob().getProperty(JobImpl.PROPERTY_DELAY_OVERRIDE) != null ) {
@@ -901,44 +815,38 @@ public abstract class AbstractJobQueue
return delay;
}
- /**
- * Reschedule a job.
- */
- protected abstract JobHandler reschedule(final JobHandler info);
-
- /**
- * Put another job into the queue.
- */
- protected abstract void put(final JobHandler event);
-
- /**
- * Get another job from the queue.
- */
- protected abstract JobHandler take();
-
- /**
- * Is the queue empty?
- */
- protected abstract boolean isEmpty();
+ protected void reschedule(final JobHandler handler) {
+ // update event with retry count and retries
+ handler.reschedule();
+ }
/**
- * Remove all events from the queue and return them.
+ * Helper method which just logs the exception in debug mode.
+ * @param e
*/
- protected abstract Collection<JobHandler> removeAllJobs();
-
- protected abstract JobHandler start(final JobHandler event);
-
- protected abstract void notifyFinished(final JobHandler rescheduleInfo);
+ protected void ignoreException(Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
public boolean stopJob(final JobImpl job) {
final JobHandler handler;
- synchronized ( this.processsingJobsLists ) {
- handler = this.processsingJobsLists.get(job.getId());
+ synchronized ( this.processingJobsLists ) {
+ handler = this.processingJobsLists.get(job.getId());
}
if ( handler != null ) {
handler.stop();
}
return handler != null;
}
+
+ /**
+ * Start processing of a new job.
+ * @param handler The new job handler
+ */
+ protected abstract void start(final JobHandler handler);
+
+ protected abstract void notifyFinished(boolean reschedule);
}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Oct 15 17:55:52 2014
@@ -18,19 +18,8 @@
*/
package org.apache.sling.event.impl.jobs.queues;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.osgi.service.event.EventAdmin;
/**
* An ordered job queue is processing the queue FIFO in a serialized
@@ -40,54 +29,20 @@ import org.osgi.service.event.EventAdmin
*/
public final class OrderedJobQueue extends AbstractJobQueue {
- /** The job handler for rescheduling. */
- private volatile JobHandler jobHandler;
-
- /** Lock and status object for handling the sleep phase. */
- private final SleepLock sleepLock = new SleepLock();
-
- /** The queue - we use a set which is sorted by job creation date. */
- private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>() {
-
- @Override
- public int compare(final JobHandler o1, final JobHandler o2) {
- if ( o1.getJob() == null ) {
- if ( o2.getJob() == null ) {
- return 0;
- }
- return -1;
- }
- if ( o2.getJob() == null ) {
- return 1;
- }
- int result = o1.getJob().getCreated().compareTo(o2.getJob().getCreated());
- if (result == 0 ) {
- result = o1.getJob().getId().compareTo(o2.getJob().getId());
- }
- return result;
- }
- });
-
+ /** Object to sync operations within this queue. */
private final Object syncLock = new Object();
+ /** Sleep delay if job needs rescheduling. */
+ private long sleepDelay = -1;
+
public OrderedJobQueue(final String name,
final InternalQueueConfiguration config,
- final JobConsumerManager jobConsumerManager,
- final ThreadPoolManager threadPoolManager,
- final EventAdmin eventAdmin) {
- super(name, config, jobConsumerManager, threadPoolManager, eventAdmin);
- }
-
- @Override
- public String getStateInfo() {
- return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince;
+ final QueueServices services) {
+ super(name, config, services);
}
@Override
- protected JobHandler start(final JobHandler handler) {
- JobHandler rescheduleHandler = null;
-
- // if we are ordered we simply wait for the finish
+ protected void start(final JobHandler handler) {
synchronized ( this.syncLock ) {
if ( this.executeJob(handler) ) {
this.isWaiting = true;
@@ -100,151 +55,45 @@ public final class OrderedJobQueue exten
Thread.currentThread().interrupt();
}
}
- this.logger.debug("Job queue {} is continuing.", this.queueName);
- rescheduleHandler = this.jobHandler;
- this.jobHandler = null;
- }
- }
- return rescheduleHandler;
- }
-
- private void wakeUp(final boolean discardJob) {
- synchronized ( this.sleepLock ) {
- if ( this.sleepLock.sleepingSince != -1 ) {
- if ( discardJob ) {
- this.sleepLock.jobHandler = null;
- }
- this.sleepLock.notify();
- }
- }
- }
-
- @Override
- public void resume() {
- this.wakeUp(false);
- super.resume();
- }
-
- @Override
- protected void put(final JobHandler handler) {
- synchronized ( this.queue ) {
- this.queue.add(handler);
- this.queue.notify();
- this.isWaitingForNext = false;
- }
- }
-
- @Override
- protected JobHandler take() {
- synchronized ( this.queue ) {
- while ( this.queue.isEmpty() ) {
- this.isWaitingForNext = true;
- try {
- this.queue.wait();
- } catch (final InterruptedException e) {
- this.ignoreException(e);
- Thread.currentThread().interrupt();
+ if ( this.sleepDelay > 0 ) {
+ final long waitingTime = this.sleepDelay;
+ this.sleepDelay = -1;
+ final long startTime = System.currentTimeMillis();
+ this.logger.debug("Job queue {} is sleeping {}ms for retry.", this.queueName, waitingTime);
+ this.isWaiting = true;
+ while ( this.isWaiting ) {
+ try {
+ this.syncLock.wait(waitingTime);
+ if ( System.currentTimeMillis() >= startTime + waitingTime ) {
+ this.isWaiting = false;
+ }
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ }
+ }
}
- this.isWaitingForNext = false;
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
}
- // get the first element and remove it
- final Iterator<JobHandler> i = this.queue.iterator();
- final JobHandler result = i.next();
- i.remove();
- return result;
}
}
@Override
- protected boolean isEmpty() {
- synchronized ( this.queue ) {
- return this.queue.isEmpty();
- }
+ protected void reschedule(final JobHandler handler) {
+ super.reschedule(handler);
+ this.sleepDelay = this.getRetryDelay(handler);
}
@Override
- protected void notifyFinished(final JobHandler rescheduleHandler) {
- this.jobHandler = rescheduleHandler;
+ protected void notifyFinished(final boolean reschedule) {
this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
synchronized ( this.syncLock ) {
this.isWaiting = false;
- this.syncLock.notify();
- }
- }
-
- @Override
- protected JobHandler reschedule(final JobHandler handler) {
- // we just sleep for the delay time - if none, we continue and retry
- // this job again
- final long delay = this.getRetryDelay(handler);
- if ( delay > 0 ) {
- synchronized ( this.sleepLock ) {
- this.sleepLock.sleepingSince = System.currentTimeMillis();
- this.sleepLock.jobHandler = handler;
- this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
- try {
- this.sleepLock.wait(delay);
- } catch (final InterruptedException e) {
- this.ignoreException(e);
- Thread.currentThread().interrupt();
- }
- this.sleepLock.sleepingSince = -1;
- final JobHandler result = this.sleepLock.jobHandler;
- this.sleepLock.jobHandler = null;
-
- if ( result == null ) {
- handler.cancel();
- }
- return result;
+ if ( !reschedule ) {
+ this.sleepDelay = -1;
}
+ this.syncLock.notify();
}
- return handler;
- }
-
- /**
- * @see org.apache.sling.event.jobs.Queue#clear()
- */
- @Override
- public void clear() {
- synchronized ( this.queue ) {
- this.queue.clear();
- }
- super.clear();
- }
-
- @Override
- public synchronized void removeAll() {
- // remove all remaining jobs first
- super.removeAll();
- this.jobHandler = null;
- this.wakeUp(true);
- }
-
- @Override
- protected Collection<JobHandler> removeAllJobs() {
- final List<JobHandler> events = new ArrayList<JobHandler>();
- synchronized ( this.queue ) {
- events.addAll(this.queue);
- this.queue.clear();
- }
- return events;
- }
-
- @Override
- public Object getState(final String key) {
- if ( "isSleepingUntil".equals(key) ) {
- return this.sleepLock.sleepingSince;
- }
- return super.getState(key);
- }
-
- private static final class SleepLock {
-
- /** Marker indicating that this queue is currently sleeping. */
- public volatile long sleepingSince = -1;
-
- /** The job event to be returned after sleeping. */
- public volatile JobHandler jobHandler;
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Wed Oct 15 17:55:52 2014
@@ -18,81 +18,119 @@
*/
package org.apache.sling.event.impl.jobs.queues;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.sling.commons.scheduler.Scheduler;
-import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import java.util.Date;
+
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.osgi.service.event.EventAdmin;
/**
* The default parallel job queue processing the entries FIFO.
* Failing jobs are rescheduled and put at the end of the queue.
*/
-public final class ParallelJobQueue extends AbstractParallelJobQueue {
+public final class ParallelJobQueue extends AbstractJobQueue {
+
+ private volatile int jobCount;
- /** The queue. */
- private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>();
+ private final Object syncLock = new Object();
public ParallelJobQueue(final String name,
final InternalQueueConfiguration config,
- final JobConsumerManager jobConsumerManager,
- final ThreadPoolManager threadPoolManager,
- final EventAdmin eventAdmin,
- final Scheduler scheduler) {
- super(name, config, jobConsumerManager, threadPoolManager, eventAdmin, scheduler);
+ final QueueServices services) {
+ super(name, config, services);
}
@Override
- protected void put(final JobHandler event) {
- try {
- this.isWaitingForNext = false;
- this.queue.put(event);
- } catch (final InterruptedException e) {
- this.ignoreException(e);
- Thread.currentThread().interrupt();
- }
+ public String getStateInfo() {
+ return super.getStateInfo() + ", jobCount=" + this.jobCount;
}
@Override
- protected JobHandler take() {
- try {
- this.isWaitingForNext = true;
- return this.queue.take();
- } catch (final InterruptedException e) {
- this.ignoreException(e);
- Thread.currentThread().interrupt();
- } finally {
- this.isWaitingForNext = false;
+ protected void start(final JobHandler processInfo) {
+ // acquire a slot
+ this.acquireSlot();
+
+ // check if we got outdated in the meantime
+ if ( this.isOutdated() ) {
+ this.freeSlot();
+ return;
+ }
+ if ( !this.executeJob(processInfo) ) {
+ this.freeSlot();
}
- return null;
}
- @Override
- protected boolean isEmpty() {
- return this.queue.isEmpty();
+ /**
+ * Acquire a processing slot.
+ * This method is called if the queue is not ordered.
+ */
+ private void acquireSlot() {
+ synchronized ( this.syncLock ) {
+ if ( jobCount >= this.configuration.getMaxParallel() ) {
+ this.isWaiting = true;
+ this.logger.debug("Job queue {} is processing {} jobs - waiting for a free slot.", this.queueName, jobCount);
+ while ( this.isWaiting ) {
+ try {
+ this.syncLock.wait();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
+ }
+ jobCount++;
+ }
}
/**
- * @see org.apache.sling.event.jobs.Queue#clear()
+ * Free a slot when a job processing is finished.
*/
+ private void freeSlot() {
+ synchronized ( this.syncLock ) {
+ jobCount--;
+ if ( this.isWaiting ) {
+ this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+ this.isWaiting = false;
+ this.syncLock.notify();
+ }
+ }
+ }
+
+ @Override
+ protected boolean canBeClosed() {
+ boolean result = super.canBeClosed();
+ if ( result ) {
+ result = this.jobCount == 0;
+ }
+ return result;
+ }
+
@Override
- public void clear() {
- this.queue.clear();
- super.clear();
+ protected void notifyFinished(final boolean reschedule) {
+ this.freeSlot();
}
@Override
- protected Collection<JobHandler> removeAllJobs() {
- final List<JobHandler> events = new ArrayList<JobHandler>();
- this.queue.drainTo(events);
- return events;
+ protected void reschedule(final JobHandler handler) {
+ // we just sleep for the delay time - if none, we continue and retry
+ // this job again
+ final long delay = this.getRetryDelay(handler);
+ if ( delay > 0 ) {
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
+ final Runnable t = new Runnable() {
+ @Override
+ public void run() {
+ ParallelJobQueue.super.reschedule(handler);
+ }
+ };
+ services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName));
+ } else {
+ // put directly into queue
+ super.reschedule(handler);
+ }
}
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632141&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Wed Oct 15 17:55:52 2014
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.TestLogger;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
+import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.apache.sling.event.impl.jobs.topics.TopicManager;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.jmx.QueuesMBean;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of the job manager.
+ */
+@Component(immediate=true)
+@Service(value={Runnable.class, QueueManager.class})
+@Properties({
+ @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
+ @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
+})
+public class QueueManager
+ implements Runnable {
+
+ /** Default logger. */
+ private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ @Reference
+ private Scheduler scheduler;
+
+ @Reference
+ private JobConsumerManager jobConsumerManager;
+
+ @Reference
+ private QueuesMBean queuesMBean;
+
+ @Reference
+ private ThreadPoolManager threadPoolManager;
+
+ /** The job manager configuration. */
+ @Reference
+ private JobManagerConfiguration configuration;
+
+ @Reference
+ private StatisticsManager statisticsManager;
+
+ @Reference
+ private QueueConfigurationManager queueManager;
+
+ /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
+ private final Object queuesLock = new Object();
+
+ /** All active queues. */
+ private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+
+ /** We count the scheduler runs. */
+ private volatile long schedulerRuns;
+
+ /**
+ * Activate this component.
+ * @param props Configuration properties
+ */
+ @Activate
+ protected void activate(final Map<String, Object> props) {
+ logger.info("Apache Sling Queue Manager started on instance {}", Environment.APPLICATION_ID);
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ @Deactivate
+ protected void deactivate() {
+ logger.debug("Apache Sling Queue Manager stopping on instance {}", Environment.APPLICATION_ID);
+
+ final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
+ while ( i.hasNext() ) {
+ final AbstractJobQueue jbq = i.next();
+ jbq.close();
+ // update mbeans
+ ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
+ }
+ this.queues.clear();
+ logger.info("Apache Sling Queue Manager stopped on instance {}", Environment.APPLICATION_ID);
+ }
+
+ /**
+ * This method is invoked periodically by the scheduler.
+ * It searches for idle queues and stops them after a timeout. If a queue
+ * is idle for two consecutive clean up calls, it is removed.
+ * @see java.lang.Runnable#run()
+ */
+ private void maintain() {
+ this.schedulerRuns++;
+ logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns);
+
+ // check for unprocessed jobs first
+ logger.debug("Checking for unprocessed jobs...");
+ for(final AbstractJobQueue jbq : this.queues.values() ) {
+ jbq.checkForUnprocessedJobs();
+ }
+
+ // we only do a full clean up on every fifth run
+ final boolean doFullCleanUp = (schedulerRuns % 5 == 0);
+
+ if ( doFullCleanUp ) {
+ // check for idle queue
+ logger.debug("Checking for idle queues...");
+
+ // we synchronize to avoid creating a queue which is about to be removed during cleanup
+ synchronized ( queuesLock ) {
+ final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, AbstractJobQueue> current = i.next();
+ final AbstractJobQueue jbq = current.getValue();
+ if ( jbq.tryToClose() ) {
+ logger.debug("Removing idle job queue {}", jbq);
+ // remove
+ i.remove();
+ // update mbeans
+ ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
+ }
+ }
+ }
+ }
+ logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
+ }
+
+ /**
+ * Start a new queue
+ * This method first searches the corresponding queue - if such a queue
+ * does not exist yet, it is created and started.
+ *
+ * @param topic The topic
+ */
+ public void start(final TopicManager topicManager, final QueueInfo queueInfo) {
+ final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+ // get or create queue
+ AbstractJobQueue queue = null;
+ // we synchronize to avoid creating a queue which is about to be removed during cleanup
+ synchronized ( queuesLock ) {
+ queue = this.queues.get(queueInfo.queueName);
+ // check for reconfiguration, we really do an identity check here(!)
+ if ( queue != null && queue.getConfiguration() != config ) {
+ this.outdateQueue(queue);
+ // we use a new queue with the configuration
+ queue = null;
+ }
+ if ( queue == null ) {
+ final QueueServices services = new QueueServices();
+ services.eventAdmin = this.eventAdmin;
+ services.jobConsumerManager = this.jobConsumerManager;
+ services.scheduler = this.scheduler;
+ services.threadPoolManager = this.threadPoolManager;
+ services.topicManager = topicManager;
+ services.statisticsManager = statisticsManager;
+ if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
+ queue = new OrderedJobQueue(queueInfo.queueName, config, services);
+ } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
+ queue = new ParallelJobQueue(queueInfo.queueName, config, services);
+ } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
+ queue = new ParallelJobQueue(queueInfo.queueName, config, services);
+ }
+ // this is just a sanity check, actually we always have a queue instance here
+ if ( queue != null ) {
+ queues.put(queueInfo.queueName, queue);
+ ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
+ queue.start();
+ }
+ }
+ }
+ }
+
+ /**
+ * This method is invoked periodically by the scheduler.
+ * In the default configuration every minute
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ this.maintain();
+ }
+
+ private void outdateQueue(final AbstractJobQueue queue) {
+ // remove the queue with the old name
+ // check for main queue
+ final String oldName = ResourceHelper.filterQueueName(queue.getName());
+ this.queues.remove(oldName);
+ // check if we can close or have to rename
+ if ( queue.tryToClose() ) {
+ // copy statistics
+ // update mbeans
+ ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, queue));
+ } else {
+ queue.outdate();
+ // readd with new name
+ String newName = ResourceHelper.filterName(queue.getName());
+ int index = 0;
+ while ( this.queues.containsKey(newName) ) {
+ newName = ResourceHelper.filterName(queue.getName()) + '$' + String.valueOf(index++);
+ }
+ this.queues.put(newName, queue);
+ // update mbeans
+ ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue));
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#restart()
+ */
+ public void restart() {
+ // let's rename/close all queues and clear them
+ synchronized ( queuesLock ) {
+ final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
+ for(final AbstractJobQueue queue : queues ) {
+ queue.clear();
+ this.outdateQueue(queue);
+ }
+ }
+ }
+
+ private void stopProcessing() {
+ // let's rename/close all queues and clear them
+ synchronized ( queuesLock ) {
+ final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
+ for(final AbstractJobQueue queue : queues ) {
+ queue.clear();
+ this.outdateQueue(queue);
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String)
+ */
+ public Queue getQueue(final String name) {
+ return this.queues.get(name);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getQueues()
+ */
+ public Iterable<Queue> getQueues() {
+ final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
+ return new Iterable<Queue>() {
+
+ @Override
+ public Iterator<Queue> iterator() {
+ return new Iterator<Queue>() {
+
+ @Override
+ public boolean hasNext() {
+ return jqI.hasNext();
+ }
+
+ @Override
+ public Queue next() {
+ return jqI.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1632141&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Wed Oct 15 17:55:52 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.apache.sling.event.impl.jobs.topics.TopicManager;
+import org.osgi.service.event.EventAdmin;
+
+public class QueueServices {
+
+ public JobConsumerManager jobConsumerManager;
+
+ public EventAdmin eventAdmin;
+
+ public ThreadPoolManager threadPoolManager;
+
+ public Scheduler scheduler;
+
+ public TopicManager topicManager;
+
+ public StatisticsManager statisticsManager;
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
------------------------------------------------------------------------------
svn:mime-type = text/plain