You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/11 08:54:14 UTC
svn commit: r1021247 [4/6] - in /sling/branches/eventing-3.0: ./ .settings/
src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/...
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.JobStatusProvider;
+import org.apache.sling.event.JobsIterator;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobManager.QueryType;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.osgi.service.event.Event;
+
+
+/**
+ * An service to query jobs.
+ *
+ * @deprecated
+ */
+@Deprecated
+@Component
+@Service(value=JobStatusProvider.class)
+public class JobStatusProviderImpl
+ implements JobStatusProvider {
+
+ @Reference
+ private JobManager jobManager;
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#removeJob(java.lang.String, java.lang.String)
+ */
+ public boolean removeJob(final String topic, final String jobId) {
+ if ( jobId != null && topic != null ) {
+ final Event job = this.jobManager.findJob(topic,
+ Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
+ if ( job != null ) {
+ return this.removeJob((String)job.getProperty(JobUtil.JOB_ID));
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#removeJob(java.lang.String)
+ */
+ public boolean removeJob(String jobId) {
+ return this.jobManager.removeJob(jobId);
+ }
+
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#forceRemoveJob(java.lang.String, java.lang.String)
+ */
+ public void forceRemoveJob(final String topic, final String jobId) {
+ if ( jobId != null && topic != null ) {
+ final Event job = this.jobManager.findJob(topic,
+ Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)jobId));
+ if ( job != null ) {
+ this.forceRemoveJob((String)job.getProperty(JobUtil.JOB_ID));
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#forceRemoveJob(java.lang.String)
+ */
+ public void forceRemoveJob(final String jobId) {
+ this.jobManager.forceRemoveJob(jobId);
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#wakeUpJobQueue(java.lang.String)
+ */
+ public void wakeUpJobQueue(final String jobQueueName) {
+ final Queue q = this.jobManager.getQueue(jobQueueName);
+ if ( q != null ) {
+ q.resume();
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#queryAllJobs(String, Map...)
+ */
+ public JobsIterator queryAllJobs(final String topic, final Map<String, Object>... filterProps) {
+ return new JobsIteratorImpl(this.jobManager.queryJobs(QueryType.ALL, topic, filterProps));
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#queryCurrentJobs(String, Map...)
+ */
+ public JobsIterator queryCurrentJobs(final String topic, final Map<String, Object>... filterProps) {
+ return new JobsIteratorImpl(this.jobManager.queryJobs(QueryType.ACTIVE, topic, filterProps));
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#queryScheduledJobs(String, Map...)
+ */
+ public JobsIterator queryScheduledJobs(final String topic, final Map<String, Object>... filterProps) {
+ return new JobsIteratorImpl(this.jobManager.queryJobs(QueryType.QUEUED, topic, filterProps));
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String)
+ */
+ @Deprecated
+ public Collection<Event> getCurrentJobs(String topic) {
+ return this.getCurrentJobs(topic, (Map<String, Object>[])null);
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String)
+ */
+ @Deprecated
+ public Collection<Event> getScheduledJobs(String topic) {
+ return this.getScheduledJobs(topic, (Map<String, Object>[])null);
+ }
+
+ private Collection<Event> asList(final JobsIterator ji) {
+ final List<Event> jobs = new ArrayList<Event>();
+ while ( ji.hasNext() ) {
+ jobs.add(ji.next());
+ }
+ return jobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String, java.util.Map...)
+ */
+ @Deprecated
+ public Collection<Event> getCurrentJobs(String topic, Map<String, Object>... filterProps) {
+ return this.asList(this.queryCurrentJobs(topic, filterProps));
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getScheduledJobs(java.lang.String, java.util.Map...)
+ */
+ @Deprecated
+ public Collection<Event> getScheduledJobs(String topic, Map<String, Object>... filterProps) {
+ return this.asList(this.queryScheduledJobs(topic, filterProps));
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#getAllJobs(java.lang.String, java.util.Map...)
+ */
+ @Deprecated
+ public Collection<Event> getAllJobs(String topic, Map<String, Object>... filterProps) {
+ return this.asList(this.queryAllJobs(topic, filterProps));
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String, java.lang.String)
+ */
+ @Deprecated
+ public void cancelJob(String topic, String jobId) {
+ this.removeJob(topic, jobId);
+ }
+
+ /**
+ * @see org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String)
+ */
+ @Deprecated
+ public void cancelJob(String jobId) {
+ this.removeJob(jobId);
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobStatusProviderImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.Iterator;
+
+import org.apache.sling.event.JobsIterator;
+import org.osgi.service.event.Event;
+
+/**
+ * JCR Based Implementation of the jobs iterator
+ * @deprecated
+ */
+@Deprecated
+public class JobsIteratorImpl implements JobsIterator {
+
+ private final org.apache.sling.event.jobs.JobsIterator delegatee;
+
+ public JobsIteratorImpl(final org.apache.sling.event.jobs.JobsIterator i) {
+ this.delegatee = i;
+ }
+
+ /**
+ * @see org.apache.sling.event.JobsIterator#close()
+ * @deprecated
+ */
+ @Deprecated
+ public void close() {
+ // nothing to do
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobsIterator#getPosition()
+ */
+ public long getPosition() {
+ return this.delegatee.getPosition();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobsIterator#getSize()
+ */
+ public long getSize() {
+ return this.delegatee.getSize();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobsIterator#skip(long)
+ */
+ public void skip(long skipNum) {
+ this.delegatee.skip(skipNum);
+ }
+
+ /**
+ * @see java.util.Iterator#hasNext()
+ */
+ public boolean hasNext() {
+ return this.delegatee.hasNext();
+ }
+
+ /**
+ * @see java.util.Iterator#next()
+ */
+ public Event next() {
+ return this.delegatee.next();
+ }
+
+ /**
+ * @see java.util.Iterator#remove()
+ */
+ public void remove() {
+ this.delegatee.remove();
+ }
+
+ /**
+ * @see java.lang.Iterable#iterator()
+ */
+ public Iterator<Event> iterator() {
+ return this.delegatee.iterator();
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JobsIteratorImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,1074 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jcr.ItemExistsException;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.qom.Constraint;
+import javax.jcr.query.qom.Ordering;
+import javax.jcr.query.qom.QueryObjectModelFactory;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.DefaultJobManager;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Persistence handler for the jobs
+ *
+ */
+@Component(label="%job.persistence.name",
+ description="%job.persistence.description",
+ metatype=true,immediate=true)
+@Services({
+ @Service(value=PersistenceHandler.class),
+ @Service(value=EventHandler.class),
+ @Service(value=Runnable.class)
+})
+@Properties({
+ @Property(name="event.topics",propertyPrivate=true,
+ value={"org/osgi/framework/BundleEvent/UPDATED",
+ "org/osgi/framework/BundleEvent/STARTED",
+ JobUtil.TOPIC_JOB}),
+ @Property(name="scheduler.period", longValue=300,label="%persscheduler.period.name",description="%persscheduler.period.description"),
+ @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
+})
+public class PersistenceHandler implements EventListener, Runnable, EventHandler {
+
+ /** Default repository path. */
+ private static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
+
+ /** The path where all jobs are stored. */
+ @Property(value=DEFAULT_REPOSITORY_PATH, propertyPrivate=true)
+ private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+ /** Default clean up time is 5 minutes. */
+ private static final int DEFAULT_CLEANUP_PERIOD = 5;
+
+ @Property(intValue=DEFAULT_CLEANUP_PERIOD,label="%jobcleanup.period.name",description="%jobcleanup.period.description")
+ private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+ /** Default maximum load jobs. */
+ private static final long DEFAULT_MAXIMUM_LOAD_JOBS = 1000;
+
+ /** Number of jobs to load from the repository on startup in one go. */
+ @Property(longValue=DEFAULT_MAXIMUM_LOAD_JOBS)
+ private static final String CONFIG_PROPERTY_MAX_LOAD_JOBS = "max.load.jobs";
+
+ /** Default load threshold. */
+ private static final long DEFAULT_LOAD_THRESHOLD = 400;
+
+ /** Threshold - if the queue is lower than this threshold the repository is checked for events. */
+ @Property(longValue=DEFAULT_LOAD_THRESHOLD)
+ private static final String CONFIG_PROPERTY_LOAD_THREASHOLD = "load.threshold";
+
+ /** Default background load delay. */
+ private static final long DEFAULT_BACKGROUND_LOAD_DELAY = 30;
+
+ /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
+ @Property(longValue=DEFAULT_BACKGROUND_LOAD_DELAY)
+ private static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
+
+ /** Default background check delay. */
+ private static final long DEFAULT_BACKGROUND_CHECK_DELAY = 240;
+
+ /** The background loader waits this time of seconds between loads from the repository. (in secs) */
+ @Property(longValue=DEFAULT_BACKGROUND_CHECK_DELAY)
+ private static final String CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY = "load.checkdelay";
+
+ /** We remove everything which is older than 5 min by default. */
+ private int cleanupPeriod;
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** The repository path. */
+ protected String repositoryPath;
+
+ /** Is the background task still running? */
+ private volatile boolean running;
+
+ /** Unloaded jobs. */
+ private Set<String>unloadedJobs = new HashSet<String>();
+
+ /** A local queue for writing received events into the repository. */
+ private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+ /** Lock for the background session. */
+ private final Object backgroundLock = new Object();
+
+ /** Background session for all reading, locking etc. */
+ private Session backgroundSession;
+
+ @Reference
+ private EnvironmentComponent environment;
+
+ @Reference
+ private JobManager jobManager;
+
+ /**
+ * Activate this component.
+ * @param context The component context.
+ */
+ @Activate
+ protected void activate(final ComponentContext context) throws RepositoryException {
+ @SuppressWarnings("unchecked")
+ final Dictionary<String, Object> props = context.getProperties();
+ this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+ this.repositoryPath = OsgiUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH);
+ this.running = true;
+
+ // start writer background thread
+ final Thread writerThread = new Thread(new Runnable() {
+ public void run() {
+ persistJobs();
+ }
+ }, "Apache Sling Job Writer");
+ writerThread.setDaemon(true);
+ writerThread.start();
+
+ // start background thread which loads jobs from the repository
+ final long loadThreshold = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_LOAD_THREASHOLD), DEFAULT_LOAD_THRESHOLD);
+ final long backgroundLoadDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+ final long backgroundCheckDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY), DEFAULT_BACKGROUND_CHECK_DELAY);
+ final long maxLoadJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAX_LOAD_JOBS), DEFAULT_MAXIMUM_LOAD_JOBS);
+ final Thread loaderThread = new Thread(new Runnable() {
+ public void run() {
+ loadJobsInTheBackground(backgroundLoadDelay, backgroundCheckDelay, loadThreshold, maxLoadJobs);
+ }
+ }, "Apache Sling Job Background Loader");
+ loaderThread.setDaemon(true);
+ loaderThread.start();
+
+ // open background session for observation
+ // create the background session and register a listener
+ this.backgroundSession = this.environment.createAdminSession();
+ this.backgroundSession.getWorkspace().getObservationManager()
+ .addEventListener(this,
+ javax.jcr.observation.Event.PROPERTY_REMOVED
+ |javax.jcr.observation.Event.PROPERTY_ADDED
+ |javax.jcr.observation.Event.NODE_REMOVED
+ |javax.jcr.observation.Event.NODE_ADDED,
+ this.repositoryPath,
+ true,
+ null,
+ null,
+ true);
+ }
+
+ /**
+ * Deactivate this component.
+ * @param context The component context.
+ */
+ @Deactivate
+ protected void deactivate(final ComponentContext context) {
+ this.running = false;
+ // stop write queue
+ try {
+ this.writeQueue.put(new Event("some", (Dictionary<String, Object>)null));
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ if ( this.backgroundSession != null ) {
+ synchronized ( this.backgroundLock ) {
+ this.logger.debug("Shutting down background session.");
+ try {
+ this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+ } catch (RepositoryException e) {
+ // we just ignore it
+ this.logger.warn("Unable to remove event listener.", e);
+ }
+ this.backgroundSession.logout();
+ this.backgroundSession = null;
+ }
+ }
+ logger.debug("Apache Sling Job Persistence Handler stopped on instance {}", Environment.APPLICATION_ID);
+ }
+
+ @Modified
+ protected void update(final ComponentContext context) {
+ // we don't need to do anything as the config values are only used for initial loading!
+ }
+
+ /**
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ public void onEvent(EventIterator iter) {
+ // we create an own session here - this is done lazy
+ Session s = null;
+ try {
+ while ( iter.hasNext() ) {
+ final javax.jcr.observation.Event event = iter.nextEvent();
+
+ try {
+ final String path = event.getPath();
+ String loadNodePath = null;
+
+ if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED) {
+ loadNodePath = path;
+ } else if ( event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+ final int pos = path.lastIndexOf('/');
+ final String propertyName = path.substring(pos+1);
+
+ // we are only interested in unlocks
+ if ( "jcr:lockOwner".equals(propertyName) ) {
+ loadNodePath = path.substring(0, pos);
+ }
+ } else if ( event.getType() == javax.jcr.observation.Event.PROPERTY_ADDED ) {
+ final int pos = path.lastIndexOf('/');
+ final String propertyName = path.substring(pos+1);
+
+ // we are only interested in locks
+ if ( "jcr:lockOwner".equals(propertyName) ) {
+ ((DefaultJobManager)this.jobManager).notifyActiveJob(path.substring(this.repositoryPath.length() + 1));
+ }
+
+ } else if ( event.getType() == javax.jcr.observation.Event.NODE_REMOVED) {
+ synchronized (unloadedJobs) {
+ this.unloadedJobs.remove(path);
+ }
+ ((DefaultJobManager)this.jobManager).notifyRemoveJob(path.substring(this.repositoryPath.length() + 1));
+ }
+ if ( loadNodePath != null ) {
+ if ( s == null ) {
+ s = this.environment.createAdminSession();
+ }
+ // we do a sanity check if the node exists first
+ if ( s.itemExists(loadNodePath) ) {
+ final Node eventNode = (Node) s.getItem(loadNodePath);
+ if ( eventNode.isNodeType(JCRHelper.JOB_NODE_TYPE) ) {
+ if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED ) {
+ logger.debug("New job has been added. Trying to load from {}", loadNodePath);
+ } else {
+ logger.debug("Job execution failed by someone else. Trying to load from {}", loadNodePath);
+ }
+ tryToLoadJob(eventNode, this.unloadedJobs);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during jcr event processing.", re);
+ }
+
+ }
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+
+ /**
+ * Return the query for the clean up.
+ */
+ private Query getCleanUpQuery(final Session s)
+ throws RepositoryException {
+ final String selectorName = "nodetype";
+ final Calendar deleteBefore = Calendar.getInstance();
+ deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+
+ final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory();
+
+ final Query q = qomf.createQuery(
+ qomf.selector(JCRHelper.JOB_NODE_TYPE, selectorName),
+ qomf.and(qomf.descendantNode(selectorName, this.repositoryPath),
+ qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_FINISHED),
+ QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+ qomf.literal(s.getValueFactory().createValue(deleteBefore)))),
+ null,
+ null
+ );
+ return q;
+ }
+
+ /**
+ * This method is invoked periodically.
+ * @see java.lang.Runnable#run()
+ */
+ public void cleanup() {
+ // remove obsolete jobs from the repository
+ if ( this.running && this.cleanupPeriod > 0 ) {
+ this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);
+
+ // we create an own session for concurrency issues
+ Session s = null;
+ try {
+ s = this.environment.createAdminSession();
+ final Query q = this.getCleanUpQuery(s);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Executing query {}", q.getStatement());
+ }
+ final NodeIterator iter = q.execute().getNodes();
+ int count = 0;
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ eventNode.remove();
+ count++;
+ }
+ s.save();
+ logger.debug("Removed {} entries from the repository.", count);
+
+ } catch (RepositoryException e) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during repository cleanup.", e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+ }
+
+ /**
+ * Load all active jobs from the repository.
+ */
+ private void loadJobsInTheBackground(final long backgroundLoadDelay,
+ final long backgroundCheckDelay,
+ final long loadThreshold,
+ final long maxLoadJobs) {
+ final long startTime = System.currentTimeMillis();
+ // give the system some time to start
+ try {
+ Thread.sleep(1000 * backgroundLoadDelay); // default is 30 seconds
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // are we still running?
+ if ( this.running ) {
+ logger.debug("Starting background loading.");
+ long loadSince = -1;
+ do {
+ loadSince = this.loadJobs(loadSince, startTime, maxLoadJobs);
+ if ( this.running && loadSince > -1 ) {
+ do {
+ try {
+ Thread.sleep(1000 * backgroundCheckDelay); // default is 240 seconds
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ } while ( this.running && this.jobManager.getStatistics().getNumberOfJobs() > loadThreshold );
+ }
+ } while (this.running && loadSince > -1);
+ logger.debug("Finished background loading.");
+ }
+ }
+
+ /**
+ * Load a batch of active jobs from the repository.
+ */
+ private long loadJobs(final long since, final long startTime, final long maxLoadJobs) {
+ long eventCreated = since;
+ final long maxLoad = (since == -1 ? maxLoadJobs : maxLoadJobs - this.jobManager.getStatistics().getNumberOfJobs());
+ // sanity check
+ if ( maxLoad > 0 ) {
+ logger.debug("Loading from repository since {} and max {}", since, maxLoad);
+ Session session = null;
+ try {
+ session = this.environment.createAdminSession();
+ final QueryManager qManager = session.getWorkspace().getQueryManager();
+ final ValueFactory vf = session.getValueFactory();
+ final String selectorName = "nodetype";
+ final Calendar startDate = Calendar.getInstance();
+ startDate.setTimeInMillis(startTime);
+
+ final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+ Constraint constraint = qomf.and(
+ qomf.descendantNode(selectorName, this.repositoryPath),
+ qomf.not(qomf.propertyExistence(selectorName, JCRHelper.NODE_PROPERTY_FINISHED)));
+ constraint = qomf.and(constraint,
+ qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED),
+ QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+ qomf.literal(vf.createValue(startDate))));
+ if ( since != -1 ) {
+ final Calendar beforeDate = Calendar.getInstance();
+ beforeDate.setTimeInMillis(since);
+ constraint = qomf.and(constraint,
+ qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED),
+ QueryObjectModelFactory.JCR_OPERATOR_GREATER_THAN,
+ qomf.literal(vf.createValue(beforeDate))));
+ }
+ final Query q = qomf.createQuery(
+ qomf.selector(JCRHelper.JOB_NODE_TYPE, selectorName),
+ constraint,
+ new Ordering[] {qomf.ascending(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED))},
+ null
+ );
+ final NodeIterator result = q.execute().getNodes();
+ long count = 0;
+ while ( result.hasNext() && count < maxLoad ) {
+ final Node eventNode = result.nextNode();
+ final String propPath = eventNode.getPath() + '/' + JCRHelper.NODE_PROPERTY_CREATED;
+ if ( session.itemExists(propPath) ) {
+ eventCreated = eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getLong();
+ if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+ count++;
+ }
+ }
+ }
+ // now we have to add all jobs with the same created time!
+ boolean done = false;
+ while ( result.hasNext() && !done ) {
+ final Node eventNode = result.nextNode();
+ final String propPath = eventNode.getPath() + '/' + JCRHelper.NODE_PROPERTY_CREATED;
+ if ( session.itemExists(propPath) ) {
+ final long created = eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getLong();
+ if ( created == eventCreated ) {
+ if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+ count++;
+ }
+ } else {
+ done = true;
+ }
+ }
+ }
+ // have we processed all jobs?
+ if ( !done && !result.hasNext() ) {
+ eventCreated = -1;
+ }
+ logger.debug("Loaded {} jobs and new since {}", count, eventCreated);
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during initial loading of stored jobs.", re);
+ } finally {
+ if ( session != null ) {
+ session.logout();
+ }
+ }
+ }
+ return eventCreated;
+ }
+
+ /**
+ * Try to load a job from an event node in the repository.
+ * @param eventNode The node to read the event from
+ * @param unloadedJobSet The set of unloaded jobs - if loading fails, the node path is added here
+ * @return <code>true</code> If the job can be loaded.
+ */
+ private boolean tryToLoadJob(final Node eventNode, final Set<String> unloadedJobSet) {
+ try {
+ final String nodePath = eventNode.getPath();
+ // first check: job should not be finished
+ if ( !eventNode.hasProperty(JCRHelper.NODE_PROPERTY_FINISHED)) {
+ boolean shouldProcess = true;
+ // second check: is this a job that should only run on the instance that it was created on?
+ if ( eventNode.hasProperty(JobUtil.PROPERTY_JOB_RUN_LOCAL) &&
+ !eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString().equals(Environment.APPLICATION_ID)) {
+ shouldProcess = false;
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Discarding job at {} : local job for a different application node.", nodePath);
+ }
+ }
+ Event event = null;
+ try {
+ event = this.readEvent(eventNode, false);
+ if ( shouldProcess ) {
+ this.process(event);
+ }
+
+ } catch (ClassNotFoundException cnfe) {
+ // store path for lazy loading
+ synchronized ( unloadedJobSet ) {
+ unloadedJobSet.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to load stored job from " + nodePath, re);
+ }
+
+ if ( event == null ) {
+ try {
+ event = this.readEvent(eventNode, true);
+ } catch (ClassNotFoundException cnfe) {
+ // this can't occur
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to load stored job from " + nodePath, re);
+ }
+ }
+ if ( event != null ) {
+ ((DefaultJobManager)this.jobManager).notifyAddJob(new JCRJobEvent(event, this));
+ }
+ return shouldProcess && event != null;
+
+ }
+ // if the node is finished, this is usually an unlock event
+ ((DefaultJobManager)this.jobManager).notifyRemoveJob(nodePath.substring(this.repositoryPath.length() + 1));
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to load stored job from " + eventNode, re);
+ }
+ return false;
+ }
+
+ /**
+ * Background thread for writing jobs to the repository
+ */
+ private void persistJobs() {
+ logger.debug("Apache Sling Job Persistence Handler started on instance {}", Environment.APPLICATION_ID);
+ Session writerSession = null;
+ Node rootNode = null;
+
+ try {
+ writerSession = this.environment.createAdminSession();
+ rootNode = this.createPath(writerSession.getRootNode(),
+ this.repositoryPath.substring(1),
+ JCRHelper.NODETYPE_ORDERED_FOLDER);
+ writerSession.save();
+
+ try {
+ this.processWriteQueue(rootNode);
+ } catch (Throwable t) { //NOSONAR
+ logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ } catch (RepositoryException e) {
+ // there is nothing we can do except log!
+ logger.error("Error during session starting.", e);
+ running = false;
+ } finally {
+ if ( writerSession != null ) {
+ writerSession.logout();
+ }
+ }
+ }
+
+ /**
+ * The writer queue. One job is written on each run.
+ */
+ private void processWriteQueue(final Node rootNode) {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( event != null && this.running ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Persisting job {}", EventUtil.toString(event));
+ }
+ final String jobId = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
+ final String jobTopic = (String)event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ final String nodePath = Utility.getUniquePath(jobTopic, jobId);
+
+ // if the job has no job id, we can just write the job to the repo and don't
+ // need locking
+ if ( jobId == null ) {
+ try {
+ this.writeEvent(rootNode, event, nodePath);
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
+ }
+ } else {
+ try {
+ // let's first search for an existing node with the same id
+ Node foundNode = null;
+ if ( rootNode.hasNode(nodePath) ) {
+ foundNode = rootNode.getNode(nodePath);
+ }
+ if ( foundNode == null ) {
+ // We now write the event into the repository
+ try {
+ this.writeEvent(rootNode, event, nodePath);
+ } catch (ItemExistsException iee) {
+ // someone else did already write this node in the meantime
+ // nothing to do for us
+ }
+ }
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Return the repository path.
+ */
+ public String getRepositoryPath() {
+ return this.repositoryPath;
+ }
+
+ /**
+ * Write an event to the repository.
+ * @param rootNode The root node for all jobs
+ * @param e The event
+ * @param suggestedName A suggested name/path for the node.
+ * @throws RepositoryException
+ */
+ private void writeEvent(final Node rootNode, final Event e, final String path)
+ throws RepositoryException {
+ // create new node with name of topic
+ final Node eventNode = this.createPath(rootNode,
+ path,
+ JCRHelper.JOB_NODE_TYPE);
+ JCRHelper.writeEventProperties(eventNode, e);
+
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_CREATED, Calendar.getInstance());
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_APPLICATION, Environment.APPLICATION_ID);
+
+ // job topic
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_TOPIC, (String)e.getProperty(JobUtil.PROPERTY_JOB_TOPIC));
+ // job id
+ final String jobId = (String)e.getProperty(JobUtil.PROPERTY_JOB_NAME);
+ if ( jobId != null ) {
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_JOBID, jobId);
+ }
+ rootNode.getSession().save();
+ }
+
+ /**
+ * Read an event from the repository.
+ * This method is similar as {@link #readEvent(Node, boolean)} with the exception
+ * that it even loads the event if classes are missing
+ * @throws RepositoryException
+ */
+ public Event forceReadEvent(Node eventNode)
+ throws RepositoryException {
+ try {
+ return this.readEvent(eventNode, false);
+ } catch (ClassNotFoundException cnfe) {
+ this.ignoreException(cnfe);
+ }
+ // we try it again and set the force load flag
+ try {
+ return this.readEvent(eventNode, true);
+ } catch (ClassNotFoundException cnfe) {
+ // this can never happen but we catch it anyway and rethrow
+ this.ignoreException(cnfe);
+ throw new RepositoryException(cnfe);
+ }
+ }
+
+ /**
+ * Read an event from the repository.
+ * @throws RepositoryException
+ * @throws ClassNotFoundException
+ */
+ private Event readEvent(Node eventNode, final boolean forceLoad)
+ throws RepositoryException, ClassNotFoundException {
+ final String topic = eventNode.getProperty(JCRHelper.NODE_PROPERTY_TOPIC).getString();
+ final ClassLoader cl = this.environment.getDynamicClassLoader();
+ final Dictionary<String, Object> eventProps = JCRHelper.readEventProperties(eventNode, cl, forceLoad);
+
+ eventProps.put(JobUtil.JOB_ID, eventNode.getPath().substring(this.repositoryPath.length() + 1));
+ // convert to integers (jcr only supports long)
+ if ( eventProps.get(JobUtil.PROPERTY_JOB_RETRIES) != null ) {
+ eventProps.put(JobUtil.PROPERTY_JOB_RETRIES, Integer.valueOf(eventProps.get(JobUtil.PROPERTY_JOB_RETRIES).toString()));
+ }
+ if ( eventProps.get(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ eventProps.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, Integer.valueOf(eventProps.get(JobUtil.PROPERTY_JOB_RETRY_COUNT).toString()));
+ }
+ // add application id
+ eventProps.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+ // and created
+ eventProps.put(JCRHelper.NODE_PROPERTY_CREATED, eventNode.getProperty(JCRHelper.NODE_PROPERTY_CREATED).getDate());
+ try {
+ final Event event = new Event(topic, eventProps);
+ return event;
+ } catch (IllegalArgumentException iae) {
+ // this exception occurs if the topic is not correct (it should never happen,
+ // but you never know)
+ throw new RepositoryException("Unable to read event: " + iae.getMessage(), iae);
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Check the job topic of an event
+ */
+ private boolean checkJobTopic(final Event job) {
+ final String jobTopic = (String)job.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ boolean topicIsCorrect = false;
+ if ( jobTopic != null ) {
+ try {
+ @SuppressWarnings("unused")
+ final Event testEvent = new Event(jobTopic, (Dictionary<String, Object>)null);
+ topicIsCorrect = true;
+ } catch (IllegalArgumentException iae) {
+ // we just have to catch it
+ }
+ if ( !topicIsCorrect ) {
+ logger.warn("Discarding job {} : job has an illegal job topic {}", EventUtil.toString(job), jobTopic);
+ }
+ } else {
+ logger.warn("Discarding job {} : job topic is missing", EventUtil.toString(job));
+ }
+ return topicIsCorrect;
+ }
+
+ /**
+ * Store an event in the repository by putting it in the write queue.
+ */
+ private void store(final Event event) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Handling local job {}", EventUtil.toString(event));
+ }
+ // check job topic
+ if ( this.checkJobTopic(event) ) {
+ try {
+ this.writeQueue.put(event);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+ }
+
+ /**
+ * Try to reload unloaded jobs - this method is invoked if bundles have been added etc.
+ */
+ private void tryToReloadUnloadedJobs() {
+ // bundle event started or updated
+ boolean doIt = false;
+ synchronized ( this.unloadedJobs ) {
+ if ( this.unloadedJobs.size() > 0 ) {
+ doIt = true;
+ }
+ }
+ if ( doIt ) {
+ final Runnable t = new Runnable() {
+
+ public void run() {
+ synchronized (unloadedJobs) {
+ Session s = null;
+ final Set<String> newUnloadedJobs = new HashSet<String>();
+ newUnloadedJobs.addAll(unloadedJobs);
+ try {
+ s = environment.createAdminSession();
+ for(String path : unloadedJobs ) {
+ newUnloadedJobs.remove(path);
+ try {
+ if ( s.itemExists(path) ) {
+ final Node eventNode = (Node) s.getItem(path);
+ tryToLoadJob(eventNode, newUnloadedJobs);
+ }
+ } catch (RepositoryException re) {
+ // we ignore this and readd
+ newUnloadedJobs.add(path);
+ ignoreException(re);
+ }
+ }
+ } catch (RepositoryException re) {
+ // unable to create session, so we try it again next time
+ ignoreException(re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ unloadedJobs.clear();
+ unloadedJobs.addAll(newUnloadedJobs);
+ }
+ }
+ }
+
+ };
+ Environment.THREAD_POOL.execute(t);
+ }
+ }
+
+ /**
+ * Process the event and pass it on to the queue manager.
+ * Check topic and local flag first!
+ */
+ private void process(final Event event) {
+ if ( !checkJobTopic(event) ) {
+ return;
+ }
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Received new job {}", EventUtil.toString(event));
+ }
+ // check for local only jobs and remove them from the queue if they're meant
+ // for another application node
+ final String appId = (String)event.getProperty(EventUtil.PROPERTY_APPLICATION);
+ if ( event.getProperty(JobUtil.PROPERTY_JOB_RUN_LOCAL) != null
+ && appId != null && !Environment.APPLICATION_ID.equals(appId) ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(event));
+ }
+ } else {
+ final JobEvent info = new JCRJobEvent(event, this);
+ ((DefaultJobManager)this.jobManager).process(info);
+ }
+ }
+
+ /**
+ * Try to lock the node in the repository.
+ * Locking might fail if:
+ * - the node has been removed
+ * - the job has alreay been processed
+ * - someone else locked it already
+ *
+ * @param info The job event
+ * @return <code>true</code> if the node could be locked
+ */
+ public boolean lock(final JobEvent info) {
+ final String path = this.getNodePath(info.uniqueId);
+ synchronized ( this.backgroundLock ) {
+ try {
+ // check if the node still exists
+ if ( this.backgroundSession.itemExists(path)
+ && !this.backgroundSession.itemExists(path + '/' + JCRHelper.NODE_PROPERTY_FINISHED)) {
+
+ final Node eventNode = (Node) this.backgroundSession.getItem(path);
+ if ( !eventNode.isLocked() ) {
+ // lock node
+ try {
+ this.backgroundSession.getWorkspace().getLockManager().lock(path, false, true, Long.MAX_VALUE, "JobEventHandler:" + Environment.APPLICATION_ID);
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
+ return false;
+ }
+ return true;
+ }
+ }
+ } catch (RepositoryException re) {
+ this.ignoreException(re);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Unlock the node for the event
+ */
+ public void unlock(final JobEvent info) {
+ final String path = this.getNodePath(info.uniqueId);
+ synchronized ( this.backgroundLock ) {
+ try {
+ this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+ } catch (RepositoryException re) {
+ // there is nothing we can do
+ this.ignoreException(re);
+ }
+ }
+ }
+
+ /**
+ * Finish the job
+ */
+ public void finished(final JobEvent info) {
+ final String jobId = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_NAME);
+ final String path = this.getNodePath(info.uniqueId);
+ synchronized ( this.backgroundLock ) {
+ try {
+ if ( this.backgroundSession.itemExists(path) ) {
+ final Node eventNode = (Node)this.backgroundSession.getItem(path);
+ if ( jobId == null ) {
+ // simply remove the node
+ eventNode.remove();
+ } else {
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_PROCESSOR, Environment.APPLICATION_ID);
+ }
+ this.backgroundSession.save();
+ // and unlock
+ if ( jobId != null ) {
+ this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+ }
+ }
+ } catch (RepositoryException re) {
+ // there is nothing we can do
+ this.ignoreException(re);
+ }
+ }
+ }
+
+ private String getNodePath(final String jobId) {
+ return this.repositoryPath + '/' + jobId;
+ }
+
+ /**
+ * Remove the job - if not currently in processing.
+ */
+ public boolean remove(final String jobId) {
+ if ( this.backgroundSession != null && jobId != null ) {
+ final String path = this.getNodePath(jobId);
+ synchronized ( this.backgroundLock ) {
+ try {
+ if ( this.backgroundSession.itemExists(path) ) {
+ final Node eventNode = (Node) this.backgroundSession.getItem(path);
+ if ( eventNode.isLocked() ) {
+ this.logger.debug("Attempted to cancel a running job at {}", path);
+ return false;
+ }
+ // try to load job to send notification
+ try {
+ final Event job = this.forceReadEvent(eventNode);
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, job);
+ } catch (RepositoryException ignore) {
+ this.ignoreException(ignore);
+ }
+ eventNode.remove();
+ this.backgroundSession.save();
+ }
+ } catch (RepositoryException e) {
+ this.logger.error("Error during cancelling job at " + path, e);
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Reschedule the job
+ */
+ public boolean reschedule(final JobEvent info) {
+ final String path = this.getNodePath(info.uniqueId);
+ synchronized ( this.backgroundLock ) {
+ try {
+ if ( this.backgroundSession.itemExists(path) ) {
+ final Node eventNode = (Node)this.backgroundSession.getItem(path);
+ eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRIES, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES));
+ eventNode.setProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT));
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_PROCESSOR, Environment.APPLICATION_ID);
+ this.backgroundSession.save();
+
+ // and unlock
+ this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+ return true;
+ }
+ } catch (RepositoryException re) {
+ // there is nothing we can do
+ this.ignoreException(re);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Creates or gets the {@link javax.jcr.Node Node} at the given Path.
+ * In case it has to create the Node all non-existent intermediate path-elements
+ * will be create with the given intermediate node type and the returned node
+ * will be created with the given nodeType
+ *
+ * @param parentNode starting node
+ * @param relativePath to create
+ * @param intermediateNodeType to use for creation of intermediate nodes (or null)
+ * @param nodeType to use for creation of the final node (or null)
+ * @param autoSave Should save be called when a new node is created?
+ * @return the Node at path
+ * @throws RepositoryException in case of exception accessing the Repository
+ */
+ private Node createPath(Node parentNode,
+ String relativePath,
+ String nodeType)
+ throws RepositoryException {
+ if (!parentNode.hasNode(relativePath)) {
+ Node node = parentNode;
+ int pos = relativePath.lastIndexOf('/');
+ if ( pos != -1 ) {
+ final StringTokenizer st = new StringTokenizer(relativePath.substring(0, pos), "/");
+ while ( st.hasMoreTokens() ) {
+ final String token = st.nextToken();
+ if ( !node.hasNode(token) ) {
+ try {
+ node.addNode(token, JCRHelper.NODETYPE_FOLDER);
+ node.getSession().save();
+ } catch (RepositoryException re) {
+ // we ignore this as this folder might be created from a different task
+ node.refresh(false);
+ }
+ }
+ node = node.getNode(token);
+ }
+ relativePath = relativePath.substring(pos + 1);
+ }
+ if ( !node.hasNode(relativePath) ) {
+ node.addNode(relativePath, nodeType);
+ }
+ return node.getNode(relativePath);
+ }
+ return parentNode.getNode(relativePath);
+ }
+
+ /**
+ * This method is invoked periodically by the scheduler.
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ this.cleanup();
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ public void handleEvent(final Event event) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Receiving event {}", EventUtil.toString(event));
+ }
+ // we ignore remote job events
+ if ( EventUtil.isLocal(event) ) {
+ // check for bundle event
+ if ( event.getTopic().equals(JobUtil.TOPIC_JOB)) {
+ this.store(event);
+ } else {
+ // bundle event started or updated
+ this.tryToReloadUnloadedJobs();
+ }
+ }
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.event.EventPropertiesMap;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.impl.jobs.StatisticsImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.Statistics;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The job blocking queue extends the blocking queue by some
+ * functionality for the job event handling.
+ */
+public abstract class AbstractJobQueue
+ extends StatisticsImpl
+ implements JobStatusNotifier, Queue {
+
+ /** Default number of seconds to wait for an ack. */
+ private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
+
+ /** Default timeout for suspend. */
+ private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
+
+ /** The logger. */
+ protected final Logger logger;
+
+ /** Configuration. */
+ protected final InternalQueueConfiguration configuration;
+
+ /** The environment component. */
+ private final EnvironmentComponent environment;
+
+ /** The queue name. */
+ protected volatile String queueName;
+
+ /** Are we still running? */
+ protected volatile boolean running;
+
+ /** Are we marked for cleanup */
+ private volatile boolean markedForCleanUp = false;
+
+ /** Is the queue currently waiting(sleeping) */
+ protected volatile boolean isWaiting = false;
+
+ /** The map of events we're have started (send). */
+ private final Map<String, JobEvent> startedJobsLists = new HashMap<String, JobEvent>();
+
+ /** The map of events we're processing. */
+ private final Map<String, JobEvent> processsingJobsLists = new HashMap<String, JobEvent>();
+
+ /** Suspended since. */
+ private final AtomicLong suspendedSince = new AtomicLong(-1);
+
+ /** Suspend lock. */
+ private final Object suspendLock = new Object();
+
+ /**
+ * Start this queue
+ * @param name The queue name
+ * @param config The queue configuration
+ * @param environment The environment component
+ */
+ public AbstractJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent environment) {
+ this.queueName = name;
+ this.configuration = config;
+ this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
+ this.running = true;
+ this.environment = environment;
+ }
+
+ public String getStatusInfo() {
+ return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue();
+ }
+
+ /**
+ * Start the job queue.
+ */
+ public void start() {
+ final Thread queueThread = new Thread(new Runnable() {
+
+ public void run() {
+ while ( running ) {
+ logger.info("Starting job queue {}", queueName);
+ logger.debug("Configuration for job queue={}", configuration);
+
+ try {
+ runJobQueue();
+ } catch (Throwable t) { //NOSONAR
+ logger.error("Job queue " + queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
+ }
+ }
+ }
+
+ }, "Apache Sling Job Queue " + queueName);
+ queueThread.setDaemon(true);
+ queueThread.start();
+ }
+
+ /**
+ * Return the queue configuration
+ */
+ public InternalQueueConfiguration getConfiguration() {
+ return this.configuration;
+ }
+
+ /**
+ * Close this queue.
+ */
+ public void close() {
+ this.running = false;
+ this.logger.debug("Shutting down job queue {}", queueName);
+ this.logger.debug("Waking up sleeping queue {}", queueName);
+ this.resume();
+ if ( this.isWaiting ) {
+ this.logger.debug("Waking up waiting queue {}", this.queueName);
+ this.notifyFinished(null);
+ }
+ // continue queue processing to stop the queue
+ this.put(new JobEvent(null, null) {
+ public boolean lock() { return false; }
+ public void unlock() {
+ // dummy impl
+ }
+ public void finished() {
+ // dummy impl
+ }
+ public boolean remove() { return true; }
+ public boolean reschedule() { return false; }
+ });
+
+ this.processsingJobsLists.clear();
+ this.startedJobsLists.clear();
+ this.logger.info("Stopped job queue {}", this.queueName);
+ }
+
+ /**
+ * Periodically cleanup.
+ */
+ public void cleanup() {
+ if ( this.running ) {
+ // check for jobs that were started but never got an aknowledge
+ final long tooOld = System.currentTimeMillis() - DEFAULT_WAIT_FOR_ACK_IN_MS;
+ // to keep the synchronized block as fast as possible we just store the
+ // jobs to be removed in a new list and process this list afterwards
+ final List<JobEvent> restartJobs = new ArrayList<JobEvent>();
+ synchronized ( this.startedJobsLists ) {
+ final Iterator<Map.Entry<String, JobEvent>> i = this.startedJobsLists.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, JobEvent> entry = i.next();
+ if ( entry.getValue().started <= tooOld ) {
+ restartJobs.add(entry.getValue());
+ }
+ }
+ }
+
+ // restart jobs is now a list of potential candidates, we now have to check
+ // each candidate separately again!
+ if ( restartJobs.size() > 0 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // we just ignore this
+ this.ignoreException(e);
+ }
+ }
+ final Iterator<JobEvent> jobIter = restartJobs.iterator();
+ while ( jobIter.hasNext() ) {
+ final JobEvent info = jobIter.next();
+ boolean process = false;
+ synchronized ( this.startedJobsLists ) {
+ process = this.startedJobsLists.remove(info.uniqueId) != null;
+ }
+ if ( process ) {
+ this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId);
+ this.finishedJob(info.event, true);
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event)
+ */
+ public boolean sendAcknowledge(final Event job) {
+ final String location = (String)job.getProperty(JobUtil.JOB_ID);
+ final JobEvent ack;
+ synchronized ( this.startedJobsLists ) {
+ ack = this.startedJobsLists.remove(location);
+ }
+ // if the event is still in the processing list, we confirm the ack
+ if ( ack != null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Received ack for job {}", EventUtil.toString(job));
+ }
+ this.addActive(ack.started - ack.queued);
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_STARTED, job);
+ synchronized ( this.processsingJobsLists ) {
+ this.processsingJobsLists.put(location, ack);
+ }
+ } else {
+ this.decQueued();
+ }
+ return ack != null;
+ }
+
+ private boolean handleReschedule(final JobEvent jobEvent, final boolean shouldReschedule) {
+ boolean reschedule = shouldReschedule;
+ if ( shouldReschedule ) {
+ // check if we exceeded the number of retries
+ int retries = this.configuration.getMaxRetries();
+ if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES) != null ) {
+ retries = (Integer) jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRIES);
+ }
+ int retryCount = 0;
+ if ( jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
+ retryCount = (Integer)jobEvent.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT);
+ }
+ retryCount++;
+ if ( retries != -1 && retryCount > retries ) {
+ reschedule = false;
+ }
+ if ( reschedule ) {
+ // update event with retry count and retries
+ final Dictionary<String, Object> newProperties = new EventPropertiesMap(jobEvent.event);
+ newProperties.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
+ newProperties.put(JobUtil.PROPERTY_JOB_RETRIES, retries);
+ jobEvent.event = new Event(jobEvent.event.getTopic(), newProperties);
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Failed job {}", EventUtil.toString(jobEvent.event));
+ }
+ this.failedJob();
+ jobEvent.queued = System.currentTimeMillis();
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FAILED, jobEvent.event);
+ } else {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Cancelled job {}", EventUtil.toString(jobEvent.event));
+ }
+ this.cancelledJob();
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, jobEvent.event);
+ }
+ } else {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Finished job {}", EventUtil.toString(jobEvent.event));
+ }
+ this.finishedJob(System.currentTimeMillis() - jobEvent.started);
+ Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_FINISHED, jobEvent.event);
+ }
+
+ return reschedule;
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, boolean)
+ */
+ public boolean finishedJob(final Event job, final boolean shouldReschedule) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
+ }
+ if ( !this.running ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Queue is not running anymore. Discarding finish for {}", EventUtil.toString(job));
+ }
+ return false;
+ }
+ final String location = (String)job.getProperty(JobUtil.JOB_ID);
+ // let's remove the event from our processing list
+ // this is just a sanity check, as usually the job should have been
+ // removed during sendAcknowledge.
+ synchronized ( this.startedJobsLists ) {
+ this.startedJobsLists.remove(location);
+ }
+
+ // get job event
+ final JobEvent info;
+ synchronized ( this.processsingJobsLists ) {
+ info = this.processsingJobsLists.remove(location);
+ }
+ if ( info == null ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("This job has never been started by this queue: {}", EventUtil.toString(job));
+ }
+ return false;
+ }
+
+ // handle the reschedule, a new job might be returned with updated reschedule info!
+ final boolean reschedule = this.handleReschedule(info, shouldReschedule);
+
+ // if this is set after the synchronized block we have an error
+ final boolean finishSuccessful;
+
+ if ( !reschedule ) {
+ info.finished();
+ finishSuccessful = true;
+ } else {
+ finishSuccessful = info.reschedule();
+ }
+
+ if ( !finishSuccessful || !reschedule ) {
+ checkForNotify(null);
+ return false;
+ }
+ checkForNotify(info);
+ return true;
+ }
+
+ private void checkForNotify(final JobEvent info) {
+ JobEvent reprocessInfo = null;
+ if ( info != null ) {
+ reprocessInfo = this.reschedule(info);
+ }
+ notifyFinished(reprocessInfo);
+ }
+
+ protected boolean canBeMarkedForCleanUp() {
+ return this.isEmpty() && !this.isWaiting;
+ }
+ /**
+ * Mark this queue for cleanup.
+ */
+ public void markForCleanUp() {
+ if ( this.canBeMarkedForCleanUp() ) {
+ this.markedForCleanUp = true;
+ }
+ }
+
+ /**
+ * Check if this queue is marked for cleanup
+ */
+ public boolean isMarkedForCleanUp() {
+ return this.markedForCleanUp && this.canBeMarkedForCleanUp();
+ }
+
+ /**
+ * Get the name of the job queue.
+ */
+ public String getName() {
+ return this.queueName;
+ }
+
+
+ /**
+ * Add a new job to the queue.
+ */
+ public void process(final JobEvent event) {
+ this.put(event);
+ event.queued = System.currentTimeMillis();
+ this.incQueued();
+ }
+
+ /**
+ * Execute the qeue
+ */
+ private void runJobQueue() {
+ JobEvent info = null;
+ while ( this.running ) {
+ while ( this.suspendedSince.longValue() != -1 ) {
+ synchronized ( this.suspendLock ) {
+ try {
+ this.suspendLock.wait(MAX_SUSPEND_TIME);
+ } catch (final InterruptedException ignore) {
+ this.ignoreException(ignore);
+ }
+ if ( System.currentTimeMillis() > this.suspendedSince.longValue() + MAX_SUSPEND_TIME ) {
+ this.suspendedSince.set(-1);
+ }
+ }
+ }
+ if ( info == null ) {
+ // so let's wait/get the next job from the queue
+ info = this.take();
+ }
+
+ if ( info != null && this.running ) {
+ info = this.start(info);
+ }
+ }
+ }
+
+ /**
+ * Process a job
+ */
+ protected boolean executeJob(final JobEvent info) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Executing job {}.", EventUtil.toString(info.event));
+ }
+ if ( info.lock() ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Starting job {}", EventUtil.toString(info.event));
+ }
+ boolean unlock = true;
+ try {
+ final Event jobEvent = this.getJobEvent(info);
+ final EventAdmin localEA = this.environment.getEventAdmin();
+ info.started = System.currentTimeMillis();
+ // let's add the event to our processing list
+ synchronized ( this.startedJobsLists ) {
+ this.startedJobsLists.put(info.uniqueId, info);
+ }
+
+ // we need async delivery, otherwise we might create a deadlock
+ // as this method runs inside a synchronized block and the finishedJob
+ // method as well!
+ localEA.postEvent(jobEvent);
+ // do not unlock if sending was successful
+ unlock = false;
+
+ return true;
+
+ } catch (final Exception re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job processing.", re);
+ } finally {
+ if ( unlock ) {
+ info.unlock();
+ }
+ }
+ }
+
+ this.decQueued();
+ return false;
+ }
+
+ /**
+ * Create the real job event.
+ * This generates a new event object with the same properties, but with the
+ * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
+ * @param info The job event.
+ * @return The real job event.
+ */
+ private Event getJobEvent(final JobEvent info) {
+ final String eventTopic = (String)info.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ final Dictionary<String, Object> properties = new EventPropertiesMap(info.event);
+ // put properties for finished job callback
+ properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME,
+ new JobStatusNotifier.NotifierContext(this));
+ // remove app id and distributable flag
+ properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
+ properties.remove(EventUtil.PROPERTY_APPLICATION);
+
+ // set priority from configuration
+ if ( properties.get(JobUtil.PROPERTY_JOB_PRIORITY) == null ) {
+ properties.put(JobUtil.PROPERTY_JOB_PRIORITY, this.configuration.getPriority());
+ }
+ // set queue name
+ properties.put(JobUtil.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+
+ return new Event(eventTopic, properties);
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ protected void ignoreException(Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Rename this queue.
+ */
+ public void rename(final String name) {
+ this.logger.info("Queue reconfiguration: old queue {} is renamed to {}.", this.queueName, name);
+ this.queueName = name;
+ }
+
+ protected abstract JobEvent start(final JobEvent event);
+
+ protected abstract void put(final JobEvent event);
+
+ protected abstract JobEvent take();
+
+ protected abstract boolean isEmpty();
+
+ protected abstract void notifyFinished(final JobEvent rescheduleInfo);
+
+ /**
+ * Reschedule a job.
+ */
+ protected abstract JobEvent reschedule(final JobEvent info);
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStatistics()
+ */
+ public Statistics getStatistics() {
+ return this;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#resume()
+ */
+ public void resume() {
+ if ( this.isSuspended() ) {
+ synchronized ( this.suspendLock ) {
+ this.suspendLock.notify();
+ }
+ }
+ this.suspendedSince.set(-1);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#suspend()
+ */
+ public void suspend() {
+ this.suspendedSince.compareAndSet(-1, System.currentTimeMillis());
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#isSuspended()
+ */
+ public boolean isSuspended() {
+ return this.suspendedSince.longValue() != -1;
+ }
+
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#removeAll()
+ */
+ public void removeAll() {
+ final boolean wasSuspended = this.isSuspended();
+ this.suspend();
+ while ( !this.isEmpty() ) {
+ final JobEvent event = this.take();
+ if ( event != null ) {
+ event.remove();
+ }
+ }
+ this.clearQueued();
+ if ( !wasSuspended ) {
+ this.resume();
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ this.clearQueued();
+ }
+}
+
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.Date;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * Abstract base class for a parallel processing job queue.
+ */
+public abstract class AbstractParallelJobQueue extends AbstractJobQueue {
+
+ protected volatile int jobCount;
+
+ /** The scheduler for rescheduling. */
+ private final Scheduler scheduler;
+
+ public AbstractParallelJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env,
+ final Scheduler scheduler) {
+ super(name, config, env);
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public String getStatusInfo() {
+ return super.getStatusInfo() + ", jobCount=" + this.jobCount;
+ }
+
+ @Override
+ protected JobEvent start(final JobEvent processInfo) {
+ // acquire a slot
+ this.acquireSlot();
+
+ if ( !this.executeJob(processInfo) ) {
+ this.freeSlot();
+ }
+ return null;
+ }
+
+ /**
+ * Acquire a processing slot.
+ * This method is called if the queue is not ordered.
+ */
+ private void acquireSlot() {
+ synchronized ( this ) {
+ if ( jobCount >= this.configuration.getMaxParallel() ) {
+ this.isWaiting = true;
+ this.logger.debug("Job queue {} is processing {} jobs - waiting for a free slot.", this.queueName, jobCount);
+ while ( this.isWaiting ) {
+ try {
+ this.wait();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
+ }
+ jobCount++;
+ }
+ }
+
+ /**
+ * Free a slot when a job processing is finished.
+ */
+ private void freeSlot() {
+ synchronized ( this ) {
+ jobCount--;
+ if ( this.isWaiting ) {
+ this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+ this.isWaiting = false;
+ this.notify();
+ }
+ }
+ }
+
+ @Override
+ protected boolean canBeMarkedForCleanUp() {
+ boolean result = super.canBeMarkedForCleanUp();
+ if ( result ) {
+ result = this.jobCount == 0;
+ }
+ return result;
+ }
+
+ @Override
+ protected void notifyFinished(final JobEvent rescheduleInfo) {
+ this.freeSlot();
+ }
+
+ @Override
+ protected JobEvent reschedule(final JobEvent info) {
+ // we just sleep for the delay time - if none, we continue and retry
+ // this job again
+ long delay = this.configuration.getRetryDelayInMs();
+ if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+ }
+ if ( delay > 0 ) {
+ final Date fireDate = new Date();
+ fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final String jobName = "Waiting:" + queueName + ":" + info.hashCode();
+ final Runnable t = new Runnable() {
+ public void run() {
+ put(info);
+ }
+ };
+ try {
+ scheduler.fireJobAt(jobName, t, null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ } else {
+ // put directly into queue
+ put(info);
+ }
+ return null;
+ }
+}
+
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain