You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/15 16:02:25 UTC
svn commit: r1022923 [2/2] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/job/
src/main/java/org/apache/sling/event/impl/jobs/ sr...
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.StatisticsImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.Statistics;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public abstract class AbstractJobQueue
+ extends StatisticsImpl
+ implements JobStatusNotifier, Queue {
+
+ /** Default number of seconds to wait for an ack. */
+ private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
+ /** Default timeout for suspend. */
+ private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
+
+ /** The logger. */
+ protected final Logger logger;
+
+ /** Configuration. */
+ protected final InternalQueueConfiguration configuration;
+
+ /** The environment component. */
+ private final EnvironmentComponent environment;
+
+ /** The queue name. */
+ protected volatile String queueName;
+
+ /** Are we still running? */
+ protected volatile boolean running;
+
+ /** Are we marked for cleanup */
+ private volatile boolean markedForCleanUp = false;
+
+ /** Is the queue currently waiting(sleeping) */
+ protected volatile boolean isWaiting = false;
+
+ /** The map of events we're have started (send). */
+ private final Map<String, JobEvent> startedJobsLists = new HashMap<String, JobEvent>();
+
+ /** The map of events we're processing. */
+ private final Map<String, JobEvent> processsingJobsLists = new HashMap<String, JobEvent>();
+
+ /** Suspended since. */
+ private final AtomicLong suspendedSince = new AtomicLong(-1);
+
+ /** Suspend lock. */
+ private final Object suspendLock = new Object();
+
+ /**
+ * Start this queue
+ * @param name The queue name
+ * @param config The queue configuration
+ * @param environment The environment component
+ */
+ public AbstractJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent environment) {
+ this.queueName = name;
+ this.configuration = config;
+ this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+ this.running = true;
+ this.environment = environment;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+ */
+ public String getStateInfo() {
+ return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue();
+ }
+
+ /**
+ * Start the job queue.
+ */
+ public void start() {
+ final Thread queueThread = new Thread(new Runnable() {
+
+ public void run() {
+ while ( running ) {
+ logger.info("Starting job queue {}", queueName);
+ logger.debug("Configuration for job queue={}", configuration);
+
+ try {
+ runJobQueue();
+ } catch (Throwable t) { //NOSONAR
+ logger.error("Job queue " + queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
+ }
+ }
+ }
+
+ }, "Apache Sling Job Queue " + queueName);
+ queueThread.setDaemon(true);
+ queueThread.start();
+ }
+
+ /**
+ * Return the queue configuration
+ */
+ public InternalQueueConfiguration getConfiguration() {
+ return this.configuration;
+ }
+
+ /**
+ * Close this queue.
+ */
+ public void close() {
+ this.running = false;
+ this.logger.debug("Shutting down job queue {}", queueName);
+ this.logger.debug("Waking up sleeping queue {}", queueName);
+ this.resume();
+ if ( this.isWaiting ) {
+ this.logger.debug("Waking up waiting queue {}", this.queueName);
+ this.notifyFinished(null);
+ }
+ // continue queue processing to stop the queue
+ this.put(new JobEvent(null, null) {
+ public boolean lock() { return false; }
+ public void unlock() {
+ // dummy impl
+ }
+ public void finished() {
+ // dummy impl
+ }
+ public boolean remove() { return true; }
+ public boolean reschedule() { return false; }
+ });
+
+ this.processsingJobsLists.clear();
+ this.startedJobsLists.clear();
+ this.logger.info("Stopped job queue {}", this.queueName);
+ }
+
+ /**
+ * Periodically cleanup.
+ */
+ public void cleanup() {
+ if ( this.running ) {
+ // check for jobs that were started but never got an aknowledge
+ final long tooOld = System.currentTimeMillis() - DEFAULT_WAIT_FOR_ACK_IN_MS;
+ // to keep the synchronized block as fast as possible we just store the
+ // jobs to be removed in a new list and process this list afterwards
+ final List<JobEvent> restartJobs = new ArrayList<JobEvent>();
+ synchronized ( this.startedJobsLists ) {
+ final Iterator<Map.Entry<String, JobEvent>> i = this.startedJobsLists.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, JobEvent> entry = i.next();
+ if ( entry.getValue().started <= tooOld ) {
+ restartJobs.add(entry.getValue());
+ }
+ }
+ }
+
+ // restart jobs is now a list of potential candidates, we now have to check
+ // each candidate separately again!
+ if ( restartJobs.size() > 0 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // we just ignore this
+ this.ignoreException(e);
+ }
+ }
+ final Iterator<JobEvent> jobIter = restartJobs.iterator();
+ while ( jobIter.hasNext() ) {
+ final JobEvent info = jobIter.next();
+ boolean process = false;
+ synchronized ( this.startedJobsLists ) {
+ process = this.startedJobsLists.remove(info.uniqueId) != null;
+ }
+ if ( process ) {
+ this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId);
+ this.finishedJob(info.event, true);
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
+ */
+ public boolean sendAcknowledge(final Event job) {
+ final String location = (String)job.getProperty(JobUtil.JOB_ID);
+ final JobEvent ack;
+ synchronized ( this.startedJobsLists ) {
+ ack = this.startedJobsLists.remove(location);
+ }
+ // if the event is still in the processing list, we confirm the ack
+ if ( ack != null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Received ack for job {}", EventUtil.toString(job));
+ }
+ final long queueTime = ack.started - ack.queued;
+ this.addActive(queueTime);
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_STARTED, job, queueTime);
+ synchronized ( this.processsingJobsLists ) {
+ this.processsingJobsLists.put(location, ack);
+ }
+ } else {
+ this.decQueued();
+ }
+ return ack != null;
+ }
+
+ private boolean handleReschedule(final JobEvent jobEvent, final boolean shouldReschedule) {
+ boolean reschedule = shouldReschedule;
+ if ( shouldReschedule ) {
+ // check if we exceeded the number of retries
+ int retries = this.configuration.getMaxRetries();
+ if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES) != null ) {
+ retries = (Integer) jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES);
+ }
+ int retryCount = 0;
+ if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retryCount = (Integer)jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT);
+ }
+ retryCount++;
+ if ( retries != -1 && retryCount > retries ) {
+ reschedule = false;
+ }
+ if ( reschedule ) {
+ // update event with retry count and retries
+ final Dictionary<String, Object> newProperties = new EventPropertiesMap(jobEvent.event);
+ newProperties.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+ newProperties.put(JobUtil.PROPERTY_JOB_RETRIES, retries);
+ jobEvent.event = new Event(jobEvent.event.getTopic(), newProperties);
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Failed job {}", EventUtil.toString(jobEvent.event));
+ }
+ this.failedJob();
+ jobEvent.queued = System.currentTimeMillis();
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FAILED, jobEvent.event, null);
+ } else {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Cancelled job {}", EventUtil.toString(jobEvent.event));
+ }
+ this.cancelledJob();
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.event, null);
+ }
+ } else {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Finished job {}", EventUtil.toString(jobEvent.event));
+ }
+ final long processingTime = System.currentTimeMillis() - jobEvent.started;
+ this.finishedJob(processingTime);
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FINISHED, jobEvent.event, processingTime);
+ }
+
+ return reschedule;
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, boolean)
+ */
+ public boolean finishedJob(final Event job, final boolean shouldReschedule) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
+ }
+ if ( !this.running ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Queue is not running anymore. Discarding finish for {}", EventUtil.toString(job));
+ }
+ return false;
+ }
+ final String location = (String)job.getProperty(JobUtil.JOB_ID);
+ // let's remove the event from our processing list
+ // this is just a sanity check, as usually the job should have been
+ // removed during sendAcknowledge.
+ synchronized ( this.startedJobsLists ) {
+ this.startedJobsLists.remove(location);
+ }
+
+ // get job event
+ final JobEvent info;
+ synchronized ( this.processsingJobsLists ) {
+ info = this.processsingJobsLists.remove(location);
+ }
+ if ( info == null ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("This job has never been started by this queue: {}", EventUtil.toString(job));
+ }
+ return false;
+ }
+
+ // handle the reschedule, a new job might be returned with updated reschedule info!
+ final boolean reschedule = this.handleReschedule(info, shouldReschedule);
+
+ // if this is set after the synchronized block we have an error
+ final boolean finishSuccessful;
+
+ if ( !reschedule ) {
+ info.finished();
+ finishSuccessful = true;
+ } else {
+ finishSuccessful = info.reschedule();
+ }
+
+ if ( !finishSuccessful || !reschedule ) {
+ checkForNotify(null);
+ return false;
+ }
+ checkForNotify(info);
+ return true;
+ }
+
+ private void checkForNotify(final JobEvent info) {
+ JobEvent reprocessInfo = null;
+ if ( info != null ) {
+ reprocessInfo = this.reschedule(info);
+ }
+ notifyFinished(reprocessInfo);
+ }
+
+ protected boolean canBeMarkedForCleanUp() {
+ return this.isEmpty() && !this.isWaiting;
+ }
+ /**
+ * Mark this queue for cleanup.
+ */
+ public void markForCleanUp() {
+ if ( this.canBeMarkedForCleanUp() ) {
+ this.markedForCleanUp = true;
+ }
+ }
+
+ /**
+ * Check if this queue is marked for cleanup
+ */
+ public boolean isMarkedForCleanUp() {
+ return this.markedForCleanUp && this.canBeMarkedForCleanUp();
+ }
+
+ /**
+ * Get the name of the job queue.
+ */
+ public String getName() {
+ return this.queueName;
+ }
+
+
+ /**
+ * Add a new job to the queue.
+ */
+ public void process(final JobEvent event) {
+ this.put(event);
+ event.queued = System.currentTimeMillis();
+ this.incQueued();
+ }
+
+ /**
+ * Execute the qeue
+ */
+ private void runJobQueue() {
+ JobEvent info = null;
+ while ( this.running ) {
+ while ( this.suspendedSince.longValue() != -1 ) {
+ synchronized ( this.suspendLock ) {
+ try {
+ this.suspendLock.wait(MAX_SUSPEND_TIME);
+ } catch (final InterruptedException ignore) {
+ this.ignoreException(ignore);
+ }
+ if ( System.currentTimeMillis() > this.suspendedSince.longValue() + MAX_SUSPEND_TIME ) {
+ this.suspendedSince.set(-1);
+ }
+ }
+ }
+ if ( info == null ) {
+ // so let's wait/get the next job from the queue
+ info = this.take();
+ }
+
+ if ( info != null && this.running ) {
+ info = this.start(info);
+ }
+ }
+ }
+
+ /**
+ * Process a job
+ */
+ protected boolean executeJob(final JobEvent info) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Executing job {}.", EventUtil.toString(info.event));
+ }
+ if ( info.lock() ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Starting job {}", EventUtil.toString(info.event));
+ }
+ boolean unlock = true;
+ try {
+ final Event jobEvent = this.getJobEvent(info);
+ final EventAdmin localEA = this.environment.getEventAdmin();
+ info.started = System.currentTimeMillis();
+ // let's add the event to our processing list
+ synchronized ( this.startedJobsLists ) {
+ this.startedJobsLists.put(info.uniqueId, info);
+ }
+
+ // we need async delivery, otherwise we might create a deadlock
+ // as this method runs inside a synchronized block and the finishedJob
+ // method as well!
+ localEA.postEvent(jobEvent);
+ // do not unlock if sending was successful
+ unlock = false;
+
+ return true;
+
+ } catch (final Exception re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job processing.", re);
+ } finally {
+ if ( unlock ) {
+ info.unlock();
+ }
+ }
+ }
+
+ this.decQueued();
+ return false;
+ }
+
+ /**
+ * Create the real job event.
+ * This generates a new event object with the same properties, but with the
+ * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+ * @param info The job event.
+ * @return The real job event.
+ */
+ private Event getJobEvent(final JobEvent info) {
+ final String eventTopic = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ final Dictionary<String, Object> properties = new EventPropertiesMap(info.event);
+ // put properties for finished job callback
+ properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME,
+ new JobStatusNotifier.NotifierContext(this));
+ // remove app id and distributable flag
+ properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
+ properties.remove(EventUtil.PROPERTY_APPLICATION);
+
+ // set priority from configuration
+ if ( properties.get(JobUtil.PROPERTY_JOB_PRIORITY) == null ) {
+ properties.put(JobUtil.PROPERTY_JOB_PRIORITY, this.configuration.getPriority());
+ }
+ // set queue name
+ properties.put(JobUtil.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+
+ return new Event(eventTopic, properties);
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ protected void ignoreException(Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Rename this queue.
+ */
+ public void rename(final String name) {
+ this.logger.info("Queue reconfiguration: old queue {} is renamed to {}.", this.queueName, name);
+ this.queueName = name;
+ }
+
+ /**
+ * Reschedule a job.
+ */
+ protected abstract JobEvent reschedule(final JobEvent info);
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStatistics()
+ */
+ public Statistics getStatistics() {
+ return this;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#resume()
+ */
+ public void resume() {
+ if ( this.isSuspended() ) {
+ synchronized ( this.suspendLock ) {
+ this.suspendLock.notify();
+ }
+ }
+ this.suspendedSince.set(-1);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#suspend()
+ */
+ public void suspend() {
+ this.suspendedSince.compareAndSet(-1, System.currentTimeMillis());
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#isSuspended()
+ */
+ public boolean isSuspended() {
+ return this.suspendedSince.longValue() != -1;
+ }
+
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#removeAll()
+ */
+ public synchronized void removeAll() {
+ // we suspend the queue
+ final boolean wasSuspended = this.isSuspended();
+ this.suspend();
+ // we copy all events and remove them in the background
+ final Collection<JobEvent> events = this.removeAllJobs();
+ this.clearQueued();
+ final Thread t = new Thread(new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ for(final JobEvent job : events) {
+ job.remove();
+ }
+ }
+ }, "Queue RemoveAll Thread for " + this.queueName);
+ t.setDaemon(true);
+ t.start();
+ // start queue again
+ if ( !wasSuspended ) {
+ this.resume();
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ this.clearQueued();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
+ */
+ public Object getState(final String key) {
+ // not supported for now
+ return null;
+ }
+
+ /**
+ * Put another job into the queue.
+ */
+ protected abstract void put(final JobEvent event);
+
+ /**
+ * Get another job from the queue.
+ */
+ protected abstract JobEvent take();
+
+ /**
+ * Is the queue empty?
+ */
+ protected abstract boolean isEmpty();
+
+ /**
+ * Remove all events from the queue and return them.
+ */
+ protected abstract Collection<JobEvent> removeAllJobs();
+
+ protected abstract JobEvent start(final JobEvent event);
+
+ protected abstract void notifyFinished(final JobEvent rescheduleInfo);
+}
+
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.Date;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * Abstract base class for a parallel processing job queue.
+ */
+public abstract class AbstractParallelJobQueue extends AbstractJobQueue {
+
+ protected volatile int jobCount;
+
+ /** The scheduler for rescheduling. */
+ private final Scheduler scheduler;
+
+ public AbstractParallelJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env,
+ final Scheduler scheduler) {
+ super(name, config, env);
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public String getStateInfo() {
+ return super.getStateInfo() + ", jobCount=" + this.jobCount;
+ }
+
+ @Override
+ protected JobEvent start(final JobEvent processInfo) {
+ // acquire a slot
+ this.acquireSlot();
+
+ if ( !this.executeJob(processInfo) ) {
+ this.freeSlot();
+ }
+ return null;
+ }
+
+ /**
+ * Acquire a processing slot.
+ * This method is called if the queue is not ordered.
+ */
+ private void acquireSlot() {
+ synchronized ( this ) {
+ if ( jobCount >= this.configuration.getMaxParallel() ) {
+ this.isWaiting = true;
+ this.logger.debug("Job queue {} is processing {} jobs - waiting for a free slot.", this.queueName, jobCount);
+ while ( this.isWaiting ) {
+ try {
+ this.wait();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
+ }
+ jobCount++;
+ }
+ }
+
+ /**
+ * Free a slot when a job processing is finished.
+ */
+ private void freeSlot() {
+ synchronized ( this ) {
+ jobCount--;
+ if ( this.isWaiting ) {
+ this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+ this.isWaiting = false;
+ this.notify();
+ }
+ }
+ }
+
+ @Override
+ protected boolean canBeMarkedForCleanUp() {
+ boolean result = super.canBeMarkedForCleanUp();
+ if ( result ) {
+ result = this.jobCount == 0;
+ }
+ return result;
+ }
+
+ @Override
+ protected void notifyFinished(final JobEvent rescheduleInfo) {
+ this.freeSlot();
+ }
+
+ @Override
+ protected JobEvent reschedule(final JobEvent info) {
+ // we just sleep for the delay time - if none, we continue and retry
+ // this job again
+ long delay = this.configuration.getRetryDelayInMs();
+ if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+ }
+ if ( delay > 0 ) {
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final String jobName = "Waiting:" + queueName + ":" + info.hashCode();
+ final Runnable t = new Runnable() {
+ public void run() {
+ put(info);
+ }
+ };
+ try {
+ scheduler.fireJobAt(jobName, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ } else {
+ // put directly into queue
+ put(info);
+ }
+ return null;
+ }
+}
+
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * An ordered job queue is processing the queue FIFO in a serialized
+ * way. If a job fails it is rescheduled and the reschedule is processed
+ * next - this basically means that failing jobs block the queue
+ * until they are finished!
+ */
+public final class OrderedJobQueue extends AbstractJobQueue {
+
+ /** The job event for rescheduling. */
+ private JobEvent jobEvent;
+
+ /** Marker indicating that this queue is currently sleeping. */
+ private volatile long isSleepingUntil = -1;
+
+ /** The sleeping thread. */
+ private volatile Thread sleepingThread;
+
+ /** The queue. */
+ private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+ public OrderedJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env) {
+ super(name, config, env);
+ }
+
+ @Override
+ public String getStateInfo() {
+ return super.getStateInfo() + ", isSleepingUntil=" + this.isSleepingUntil;
+ }
+
+ @Override
+ protected JobEvent start(final JobEvent processInfo) {
+ JobEvent rescheduleInfo = null;
+
+ // if we are ordered we simply wait for the finish
+ if ( this.executeJob(processInfo) ) {
+ rescheduleInfo = this.waitForFinish();
+ }
+ return rescheduleInfo;
+ }
+
+ private void setNotSleeping() {
+ this.isSleepingUntil = -1;
+ this.sleepingThread = null;
+ }
+
+ private void setSleeping(final Thread sleepingThread, final long delay) {
+ this.sleepingThread = sleepingThread;
+ this.isSleepingUntil = System.currentTimeMillis() + delay;
+ }
+
+ @Override
+ public void resume() {
+ if ( this.isSleepingUntil == -1 ) {
+ final Thread thread = this.sleepingThread;
+ if ( thread != null ) {
+ thread.interrupt();
+ }
+ }
+ super.resume();
+ }
+
+ /**
+ * Wait for the job to be finished.
+ * This is called if the queue is ordered.
+ */
+ private JobEvent waitForFinish() {
+ synchronized ( this ) {
+ this.isWaiting = true;
+ this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
+ while ( this.isWaiting ) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
+ final JobEvent object = this.jobEvent;
+ this.jobEvent = null;
+ return object;
+ }
+ }
+
+ @Override
+ protected void put(final JobEvent event) {
+ try {
+ this.queue.put(event);
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+
+ @Override
+ protected JobEvent take() {
+ try {
+ return this.queue.take();
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ return this.queue.isEmpty();
+ }
+
+ @Override
+ protected void notifyFinished(final JobEvent rescheduleInfo) {
+ this.jobEvent = rescheduleInfo;
+ this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+ this.isWaiting = false;
+ synchronized ( this ) {
+ this.notify();
+ }
+ }
+
+ @Override
+ protected JobEvent reschedule(final JobEvent info) {
+ // we just sleep for the delay time - if none, we continue and retry
+ // this job again
+ long delay = this.configuration.getRetryDelayInMs();
+ if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+ }
+ if ( delay > 0 ) {
+ this.setSleeping(Thread.currentThread(), delay);
+ try {
+ this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ } finally {
+ this.setNotSleeping();
+ }
+ }
+ return info;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ this.queue.clear();
+ super.clear();
+ }
+
+ @Override
+ public synchronized void removeAll() {
+ this.jobEvent = null;
+ super.removeAll();
+ }
+
+ @Override
+ protected Collection<JobEvent> removeAllJobs() {
+ final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+ this.queue.clear();
+ return events;
+ }
+
+ @Override
+ public Object getState(final String key) {
+ if ( "isSleepingUntil".equals(key) ) {
+ return this.isSleepingUntil;
+ }
+ return super.getState(key);
+ }
+}
+
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+
+/**
+ * The default parallel job queue processing the entries FIFO.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class ParallelJobQueue extends AbstractParallelJobQueue {
+
+ /** The queue. */
+ private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+ public ParallelJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env,
+ final Scheduler scheduler) {
+ super(name, config, env, scheduler);
+ }
+
+ @Override
+ protected void put(final JobEvent event) {
+ try {
+ this.queue.put(event);
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+
+ @Override
+ protected JobEvent take() {
+ try {
+ return this.queue.take();
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ return this.queue.isEmpty();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ this.queue.clear();
+ super.clear();
+ }
+
+ @Override
+ protected Collection<JobEvent> removeAllJobs() {
+ final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+ this.queue.clear();
+ return events;
+ }
+}
+
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * This queue acts similar to the parallel job queue. Except that
+ * new jobs are selected based on a round robin topic selection scheme.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class TopicRoundRobinJobQueue extends AbstractParallelJobQueue {
+
+ /** The topic set. */
+ private final List<String> topics = new ArrayList<String>();
+
+ /** The topic map. */
+ private final Map<String, List<JobEvent>> topicMap = new HashMap<String, List<JobEvent>>();
+
+ /** Topic index. */
+ private int topicIndex;
+
+ /** Event count. */
+ private int eventCount;
+
+ private boolean isWaitingForNext = false;
+
+ public TopicRoundRobinJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env,
+ final Scheduler scheduler) {
+ super(name, config, env, scheduler);
+ }
+
+ @Override
+ public String getStateInfo() {
+ return super.getStateInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
+ }
+
+ @Override
+ protected boolean canBeMarkedForCleanUp() {
+ boolean result = super.canBeMarkedForCleanUp();
+ if ( result ) {
+ result = !this.isWaitingForNext;
+ }
+ return result;
+ }
+
+ @Override
+ protected void put(final JobEvent event) {
+ // is this a close?
+ if ( event.event == null ) {
+ return;
+ }
+ final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ synchronized ( this.topicMap ) {
+ List<JobEvent> events = this.topicMap.get(topic);
+ if ( events == null ) {
+ events = new LinkedList<JobEvent>();
+ this.topicMap.put(topic, events);
+ this.topics.add(topic);
+ }
+ events.add(event);
+ this.eventCount++;
+ if ( this.isWaitingForNext ) {
+ this.isWaitingForNext = false;
+ // wake up take()
+ this.topicMap.notify();
+ }
+ }
+ }
+
+ @Override
+ protected JobEvent take() {
+ JobEvent e = null;
+ synchronized ( this.topicMap ) {
+ if ( this.eventCount == 0 ) {
+ // wait for a new event
+ this.isWaitingForNext = true;
+ while ( this.isWaitingForNext ) {
+ try {
+ this.topicMap.wait();
+ } catch (final InterruptedException ie) {
+ this.ignoreException(ie);
+ }
+ }
+ }
+ if ( this.eventCount > 0 ) {
+ while ( e == null ) {
+ final String topic = this.topics.get(this.topicIndex);
+ final List<JobEvent> events = this.topicMap.get(topic);
+ if ( events.size() > 0 ) {
+ e = events.remove(0);
+ }
+ this.topicIndex++;
+ if ( this.topicIndex == this.topics.size() ) {
+ this.topicIndex = 0;
+ }
+ }
+ this.eventCount--;
+ }
+ }
+ return e;
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ synchronized ( this.topicMap ) {
+ return this.eventCount == 0;
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ synchronized ( this.topicMap ) {
+ this.eventCount = 0;
+ this.topics.clear();
+ this.topicMap.clear();
+ }
+ super.clear();
+ }
+
+ @Override
+ protected Collection<JobEvent> removeAllJobs() {
+ final List<JobEvent> events = new ArrayList<JobEvent>();
+ synchronized ( this.topicMap ) {
+ for(final List<JobEvent> l : this.topicMap.values() ) {
+ events.addAll(l);
+ }
+ this.eventCount = 0;
+ this.topics.clear();
+ this.topicMap.clear();
+ }
+ return events;
+ }
+}
+
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.support;
+
+import org.apache.sling.commons.threads.ThreadPool;
+
+/**
+ * This class provides "global settings"
+ * to all services, like the application id and the thread pool.
+ * @since 3.0
+ */
+public class Environment {
+
+ /** Global application id. */
+ public static String APPLICATION_ID;
+
+ /** Global thread pool. */
+ public static volatile ThreadPool THREAD_POOL;
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Fri Oct 15 14:02:23 2010
@@ -31,6 +31,66 @@ dist.events.description = Distributes lo
repository are picked up and distributed locally through the OSGi Event Admin \
Service.
+scheduler.period.name = Cleanup Internal
+scheduler.period.description = Interval in seconds in which events older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 30 minutes (1800 seconds).
+
+cleanup.period.name = Event Cleanup Age
+cleanup.period.description = The maximum age in minutes of persisted events to \
+ be purged from the repository during the cleanup run. The default is 15 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository.
+
+
+#
+# Queue Configuration and Job Event Handler
+queue.name = Apache Sling Job Queue Configuration
+queue.description = The configuration of a job processing queue.
+
+queue.name.name = Name
+queue.name.description = The name of the queue. If matching is used \
+ the token \{0\} can be used to substitute the real value.
+
+queue.type.name = Type
+queue.type.description = The queue type.
+
+queue.topics.name = Topics
+queue.topics.description = This value is required and lists the topics processed by \
+ this queue. The value is a list of strings. If a string ends with a dot, \
+ all topics in exactly this package match. If the string ends with a star, \
+ all topics in this package and all subpackages match. If the string neither \
+ ends with a dot nor with a star, this is assumed to define an exact topic.
+
+queue.priority.name = Priority
+queue.priority.description = The priority for the threads from this queue. Default is norm.
+
+queue.retries.name = Maximum Retries
+queue.retries.description = The maximum number of times a failed job slated \
+ for retries is actually retried. If a job has been retried this number of \
+ times and still fails, it is not rescheduled and assumed to have failed. The \
+ default value is 10.
+
+queue.retrydelay.name = Retry Delay
+queue.retrydelay.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 2 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+
+queue.maxparallel.name = Maximum Parallel Jobs
+queue.maxparallel.description = The maximum number of parallel jobs started for this queue. \
+ A value of -1 is substituted with the number of available processors.
+
+queue.runlocal.name = Run Local
+queue.runlocal.description = Jobs for this queue are only processed on the cluster node \
+ where the job has been started.
+
+queue.applicationids.name = Application Ids
+queue.applicationids.description = An optional list of application ids. If configured, \
+ jobs for this queue are only processed on those cluster nodes.
+
+
#
# Job Event Handler
job.events.name = Apache Sling Job Event Handler
@@ -40,23 +100,27 @@ job.events.description = Manages job sch
amongst the cluster nodes through repository events. The jobs are started \
locally on a single cluster node through the OSGi Event Admin.
+jobscheduler.period.name = Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which unused \
+ queues are stopped. The default value is 5 minutes (300 seconds).
+
+
+#
+# Persistence Handler
+job.persistence.name = Apache Sling Job Persistence Manager
+job.persistence.description = This service persists and loads jobs from the repository.
+
+persscheduler.period.name = Event Cleanup Internal
+persscheduler.period.description = Interval in seconds in which jobs older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
sleep.time.name = Retry Interval
sleep.time.description = The number of milliseconds to sleep between two \
consecutive retries of a job which failed and was set to be retried. The \
default value is 30 seconds. This value is only relevant if there is a single \
failed job in the queue. If there are multiple failed jobs, each job is \
retried in turn without an intervening delay.
-
-max.job.retries.name = Maximum Retries
-max.job.retries.description = The maximum number of times a failed job slated \
- for retries is actually retried. If a job has been retried this number of \
- times and still fails, it is not rescheduled and assumed to have failed. The \
- default value is 10.
-
-jobscheduler.period.name = Event Cleanup Internal
-jobscheduler.period.description = Interval in seconds in which jobs older than \
- a specific age (see Event Cleanup Age) are purged from the repository. \
- The default value is 5 minutes (300 seconds).
jobcleanup.period.name = Event Cleanup Age
jobcleanup.period.description = The maximum age in minutes of persisted job to \
@@ -64,16 +128,6 @@ jobcleanup.period.description = The maxi
minutes. Note that this setting defines the minimum time an event remains \
in the repository.
-wait.for.ack.name = Acknowledge Waiting Time
-wait.for.ack.description = If a service is processing a job, it acknowledges this \
- by sending a message to the Job Event Handler. If the Job Event Handler does not \
- receive such a message in the configured time, it reschedules the job. The configured \
- time is in seconds (default is 90 secs).
-
-max.parallel.jobs.name = Maximum Parallel Jobs
-max.parallel.jobs.description = The maximum number of parallel jobs started for the main \
- queue.
-
max.load.jobs.name = Max Load Jobs
max.load.jobs.description = The maximum amount of jobs being loaded from the repository on startup. \
Default is 1000 jobs.
@@ -90,44 +144,21 @@ load.checkdelay.name = Background Check
load.checkdelay.description = The background loader sleeps this time of seconds before \
checking the repository for jobs. Default value is 240 seconds.
-max.job.queues.name = Max Job Queues
-max.job.queues.description = The maximum number of job queues (default is 10). \
- If this number is exceeded all jobs for a new job queue are put into the main queue.
-
#
# Event Pool
event.pool.name = Apache Sling Event Thread Pool
-event.pool.description = This is the thread pool used by the Apache Sling eventing support.
+event.pool.description = This is the thread pool used by the Apache Sling eventing support. The \
+ threads from this pool are merely used for the job handling. By limiting this pool, it is \
+ possible to limit the maximum number of parallel processed jobs - regardless of the queue \
+ configuration.
minPoolSize.name = Min Pool Size
minPoolSize.description = The minimum pool size. The minimum pool size should be \
- higher than 20. Approx 10 threads are in use by the system, so a pool size of 20 \
- allows to process 10 events in parallel.
+ higher than 10.
maxPoolSize.name = Max Pool Size
maxPoolSize.description = The maximum pool size. The maximum pool size should be higher than \
or equal to the minimum pool size.
-queueSize.name = Queue Size
-queueSize.description = The maximum size of the thread queue if the pool is exhausted.
-
priority.name = Priority
priority.description = The priority for the threads from this pool. Default is norm.
-
-#
-# Shared labels
-scheduler.period.name = Event Cleanup Internal
-scheduler.period.description = Interval in seconds in which events older than \
- a specific age (see Event Cleanup Age) are purged from the repository. \
- The default value is 30 minutes (1800 seconds).
-
-cleanup.period.name = Event Cleanup Age
-cleanup.period.description = The maximum age in minutes of persisted events to \
- be purged from the repository during the cleanup run. The default is 15 \
- minutes. Note that this setting defines the minimum time an event remains \
- in the repository.
-
-repository.path.name = Persistent Event Location
-repository.path.description = Absolute Path of the Repository location where \
- events are persisted to be picked up by the event distribution mechanism. \
- The default value is "/sling/events".
Modified: sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/SLING-INF/nodetypes/event.cnd Fri Oct 15 14:02:23 2010
@@ -1,42 +1,42 @@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-
-<slingevent='http://sling.apache.org/jcr/event/1.0'>
-<nt='http://www.jcp.org/jcr/nt/1.0'>
-<mix='http://www.jcp.org/jcr/mix/1.0'>
-
-[slingevent:Event] > nt:unstructured, nt:hierarchyNode
- - slingevent:topic (string)
- - slingevent:application (string)
- - slingevent:created (date)
- - slingevent:properties (binary)
-
-[slingevent:Job] > slingevent:Event, mix:lockable
- - slingevent:processor (string)
- - slingevent:id (string)
- - slingevent:finished (date)
-
-[slingevent:TimedEvent] > slingevent:Event, mix:lockable
- - slingevent:processor (string)
- - slingevent:id (string)
- - slingevent:expression (string)
- - slingevent:date (date)
- - slingevent:period (long)
-
-
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+<slingevent='http://sling.apache.org/jcr/event/1.0'>
+<nt='http://www.jcp.org/jcr/nt/1.0'>
+<mix='http://www.jcp.org/jcr/mix/1.0'>
+
+[slingevent:Event] > nt:unstructured, nt:hierarchyNode
+ - slingevent:topic (string)
+ - slingevent:application (string)
+ - slingevent:created (date)
+ - slingevent:properties (binary)
+
+[slingevent:Job] > slingevent:Event, mix:lockable
+ - slingevent:processor (string)
+ - slingevent:id (string)
+ - slingevent:finished (date)
+
+[slingevent:TimedEvent] > slingevent:Event, mix:lockable
+ - slingevent:processor (string)
+ - slingevent:id (string)
+ - slingevent:expression (string)
+ - slingevent:date (date)
+ - slingevent:period (long)
+
+
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java Fri Oct 15 14:02:23 2010
@@ -31,7 +31,7 @@ import javax.jcr.PropertyType;
import javax.jcr.Value;
import javax.jcr.ValueFactory;
-import org.apache.sling.event.impl.EventHelper;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
@@ -100,17 +100,17 @@ public class EventUtilTest {
will(returnValue(getValueOfType(PropertyType.DATE, "dateValue")));
}});
// boolean
- assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, true).getType());
- assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, false).getType());
- assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, Boolean.TRUE).getType());
- assertEquals(PropertyType.BOOLEAN, EventHelper.getNodePropertyValue(factory, Boolean.FALSE).getType());
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, true).getType());
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, false).getType());
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.TRUE).getType());
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.FALSE).getType());
// long
- assertEquals(PropertyType.LONG, EventHelper.getNodePropertyValue(factory, (long)5).getType());
+ assertEquals(PropertyType.LONG, JCRHelper.getNodePropertyValue(factory, (long)5).getType());
// int = not possible
- assertEquals(null, EventHelper.getNodePropertyValue(factory, 5));
+ assertEquals(null, JCRHelper.getNodePropertyValue(factory, 5));
// string
- assertEquals(PropertyType.STRING, EventHelper.getNodePropertyValue(factory, "something").getType());
+ assertEquals(PropertyType.STRING, JCRHelper.getNodePropertyValue(factory, "something").getType());
// calendar
- assertEquals(PropertyType.DATE, EventHelper.getNodePropertyValue(factory, Calendar.getInstance()).getType());
+ assertEquals(PropertyType.DATE, JCRHelper.getNodePropertyValue(factory, Calendar.getInstance()).getType());
}
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Fri Oct 15 14:02:23 2010
@@ -18,123 +18,27 @@
*/
package org.apache.sling.event.impl;
-import static org.junit.Assert.assertTrue;
-
-import java.net.URL;
-import java.util.Collections;
import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.Set;
-import javax.jcr.Node;
-import javax.jcr.NodeIterator;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-
-import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
-import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
-import org.apache.sling.commons.threads.ThreadPoolConfig;
-import org.apache.sling.jcr.api.SlingRepository;
-import org.apache.sling.settings.SlingSettingsService;
import org.jmock.Expectations;
-import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.junit.runner.RunWith;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
-import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@RunWith(JMock.class)
-public abstract class AbstractRepositoryEventHandlerTest {
+public abstract class AbstractRepositoryEventHandlerTest extends AbstractTest {
protected volatile AbstractRepositoryEventHandler handler;
- protected static final String REPO_PATH = "/test/events";
- protected static final String SLING_ID = "4711";
-
- protected static Session session;
-
- protected abstract Mockery getMockery();
-
protected abstract AbstractRepositoryEventHandler createHandler();
- protected Dictionary<String, Object> getComponentConfig() {
- final Dictionary<String, Object> config = new Hashtable<String, Object>();
- config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
-
- return config;
- }
-
- @org.junit.BeforeClass public static void setupRepository() throws Exception {
- RepositoryTestUtil.startRepository();
- final SlingRepository repository = RepositoryTestUtil.getSlingRepository();
- session = repository.loginAdministrative(repository.getDefaultWorkspace());
- assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
- assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
- if ( session.itemExists(REPO_PATH) ) {
- session.getItem(REPO_PATH).remove();
- session.save();
- }
- }
-
- @org.junit.AfterClass public static void shutdownRepository() throws Exception {
- if ( session != null ) {
- session.logout();
- session = null;
- }
- RepositoryTestUtil.stopRepository();
- }
-
- @org.junit.Before public void setup() throws Exception {
- // activate
- this.activate(null);
- }
-
- int activateCount = 1;
-
- protected void activate(final EventAdmin ea) {
+ protected void activate(final EventAdmin ea) throws Throwable {
+ super.activate(ea);
this.handler = this.createHandler();
- this.handler.repository = RepositoryTestUtil.getSlingRepository();
- this.handler.classLoaderManager = new DynamicClassLoaderManager() {
-
- public ClassLoader getDynamicClassLoader() {
- return this.getClass().getClassLoader();
- }
- };
- // the event admin
- if ( ea != null ) {
- this.handler.eventAdmin = ea;
- } else {
- final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class, "eventAdmin" + activateCount);
- this.handler.eventAdmin = eventAdmin;
- this.getMockery().checking(new Expectations() {{
- allowing(eventAdmin).postEvent(with(any(Event.class)));
- allowing(eventAdmin).sendEvent(with(any(Event.class)));
- }});
- }
-
- // sling settings service
- this.handler.settingsService = new SlingSettingsService() {
- public String getSlingId() {
- return SLING_ID;
- }
-
- public URL getSlingHome() {
- return null;
- }
-
- public String getSlingHomePath() {
- return null;
- }
-
- public Set<String> getRunModes() {
- return Collections.<String> emptySet();
- }
- };
- // we need a thread pool
- this.handler.threadPool = new ThreadPoolImpl();
+ handler.environment = this.environment;
// lets set up the bundle context
final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext" + activateCount);
@@ -161,7 +65,7 @@ public abstract class AbstractRepository
}
}
- protected void deactivate() {
+ protected void deactivate() throws Throwable {
// lets set up the bundle context with the sling id
final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext" + activateCount);
@@ -172,44 +76,6 @@ public abstract class AbstractRepository
}});
this.handler.deactivate(componentContext);
this.handler = null;
- activateCount++;
- }
-
- @org.junit.After public void shutdown() throws Exception {
- final String path = this.handler.repositoryPath;
- this.deactivate();
- try {
- // delete all child nodes to get a clean repository again
- final Node rootNode = (Node) session.getItem(path);
- final NodeIterator iter = rootNode.getNodes();
- while ( iter.hasNext() ) {
- final Node child = iter.nextNode();
- child.remove();
- }
- session.save();
- } catch ( RepositoryException re) {
- // we ignore this for the test
- }
- }
-
- @org.junit.Test public void testPathCreation() throws RepositoryException {
- assertTrue(session.itemExists(REPO_PATH));
- }
-
- final class ThreadPoolImpl implements ThreadPool {
-
- public void execute(Runnable runnable) {
- final Thread t = new Thread(runnable);
- t.start();
- }
-
- public String getName() {
- return "default";
- }
-
- public ThreadPoolConfig getConfiguration() {
- return new ModifiableThreadPoolConfig();
- }
-
+ super.deactivate();
}
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java Fri Oct 15 14:02:23 2010
@@ -33,6 +33,8 @@ import javax.jcr.observation.EventListen
import org.apache.jackrabbit.util.ISO9075;
import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.apache.sling.event.impl.support.Environment;
import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.jmock.integration.junit4.JUnit4Mockery;
@@ -59,7 +61,7 @@ public class DistributingEventHandlerTes
}
@org.junit.Test public void testSetup() throws RepositoryException {
- assertEquals(this.handler.applicationId, SLING_ID);
+ assertEquals(Environment.APPLICATION_ID, SLING_ID);
assertEquals(this.handler.repositoryPath, REPO_PATH);
assertNotNull(this.handler.writerSession);
final EventListenerIterator iter = this.handler.writerSession.getWorkspace().getObservationManager().getRegisteredEventListeners();
@@ -82,9 +84,9 @@ public class DistributingEventHandlerTes
final NodeIterator iter = rootNode.getNodes();
iter.hasNext();
final Node eventNode = iter.nextNode();
- assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
- assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
- assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+ assertEquals(topic, eventNode.getProperty(JCRHelper.NODE_PROPERTY_TOPIC).getString());
+ assertEquals(Environment.APPLICATION_ID, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+ assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
// as a starting point we just check if the properties property exists
assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
}
@@ -100,9 +102,9 @@ public class DistributingEventHandlerTes
final NodeIterator iter = rootNode.getNodes();
iter.hasNext();
final Node eventNode = iter.nextNode();
- assertEquals(topic, eventNode.getProperty(EventHelper.NODE_PROPERTY_TOPIC).getString());
- assertEquals(handler.applicationId, eventNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString());
- assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
+ assertEquals(topic, eventNode.getProperty(JCRHelper.NODE_PROPERTY_TOPIC).getString());
+ assertEquals(Environment.APPLICATION_ID, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+ assertTrue(Calendar.getInstance().compareTo(eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getDate()) >= 0);
// as a starting point we just check if the properties property exists
assertTrue(eventNode.hasProperty(ISO9075.encode("a property")));
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java?rev=1022923&r1=1022922&r2=1022923&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java Fri Oct 15 14:02:23 2010
@@ -52,8 +52,14 @@ public class SimpleEventAdmin implements
public void sendEvent(Event event) {
if ( topics != null ) {
for(int i=0; i<topics.length; i++) {
- if ( topics[i].equals(event.getTopic()) ) {
- handler[i].handleEvent(event);
+ if ( topics[i].endsWith("*") ) {
+ if ( event.getTopic().startsWith(topics[i].substring(0, topics[i].length() - 1)) ) {
+ handler[i].handleEvent(event);
+ }
+ } else {
+ if ( topics[i].equals(event.getTopic()) ) {
+ handler[i].handleEvent(event);
+ }
}
}
}
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java?rev=1022923&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Fri Oct 15 14:02:23 2010
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+
+public class InternalQueueConfigurationTest {
+
+ private JobEvent getJobEvent(final String topic) {
+ final Dictionary<String, Object> dict = new Hashtable<String, Object>();
+ dict.put(JobUtil.PROPERTY_JOB_TOPIC, topic);
+ return new JobEvent(new Event(topic, dict), topic) {
+ public void unlock() {
+ // dummy
+ }
+ public boolean reschedule() {
+ return false;
+ }
+ public boolean remove() {
+ return false;
+ }
+ public boolean lock() {
+ return false;
+ }
+ public void finished() {
+ // dummy
+ }
+ };
+ }
+ @org.junit.Test public void testMaxParallel() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -1);
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+ }
+
+ @org.junit.Test public void testTopicMatchersDot() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a."});
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ assertTrue(c.match(getJobEvent("a/b")));
+ assertTrue(c.match(getJobEvent("a/c")));
+ assertFalse(c.match(getJobEvent("a")));
+ assertFalse(c.match(getJobEvent("a/b/c")));
+ assertFalse(c.match(getJobEvent("t")));
+ assertFalse(c.match(getJobEvent("t/x")));
+ }
+
+ @org.junit.Test public void testTopicMatchersStar() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a*"});
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ assertTrue(c.match(getJobEvent("a/b")));
+ assertTrue(c.match(getJobEvent("a/c")));
+ assertFalse(c.match(getJobEvent("a")));
+ assertTrue(c.match(getJobEvent("a/b/c")));
+ assertFalse(c.match(getJobEvent("t")));
+ assertFalse(c.match(getJobEvent("t/x")));
+ }
+
+ @org.junit.Test public void testTopicMatchers() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a"});
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ assertFalse(c.match(getJobEvent("a/b")));
+ assertFalse(c.match(getJobEvent("a/c")));
+ assertTrue(c.match(getJobEvent("a")));
+ assertFalse(c.match(getJobEvent("a/b/c")));
+ assertFalse(c.match(getJobEvent("t")));
+ assertFalse(c.match(getJobEvent("t/x")));
+ }
+
+ @org.junit.Test public void testTopicMatcherAndReplacement() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a."});
+ p.put(ConfigurationConstants.PROP_NAME, "test-queue-{0}");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ final JobEvent b = getJobEvent("a/b");
+ assertTrue(c.match(b));
+ assertEquals("test-queue-b", b.queueName);
+ final JobEvent d = getJobEvent("a/d");
+ assertTrue(c.match(d));
+ assertEquals("test-queue-d", d.queueName);
+ }
+
+ @org.junit.Test public void testTopicMatchersDotAndSlash() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ assertTrue(c.match(getJobEvent("a/b")));
+ assertTrue(c.match(getJobEvent("a/c")));
+ assertFalse(c.match(getJobEvent("a")));
+ assertFalse(c.match(getJobEvent("a/b/c")));
+ assertFalse(c.match(getJobEvent("t")));
+ assertFalse(c.match(getJobEvent("t/x")));
+ }
+
+ @org.junit.Test public void testTopicMatchersStarAndSlash() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/*"});
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ assertTrue(c.match(getJobEvent("a/b")));
+ assertTrue(c.match(getJobEvent("a/c")));
+ assertFalse(c.match(getJobEvent("a")));
+ assertTrue(c.match(getJobEvent("a/b/c")));
+ assertFalse(c.match(getJobEvent("t")));
+ assertFalse(c.match(getJobEvent("t/x")));
+ }
+
+ @org.junit.Test public void testTopicMatcherAndReplacementAndSlash() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+ p.put(ConfigurationConstants.PROP_NAME, "test-queue-{0}");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ final JobEvent b = getJobEvent("a/b");
+ assertTrue(c.match(b));
+ assertEquals("test-queue-b", b.queueName);
+ final JobEvent d = getJobEvent("a/d");
+ assertTrue(c.match(d));
+ assertEquals("test-queue-d", d.queueName);
+ }
+
+ @org.junit.Test public void testNoTopicMatchers() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertFalse(c.isValid());
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url