You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/11 08:54:14 UTC
svn commit: r1021247 [2/6] - in /sling/branches/eventing-3.0: ./ .settings/
src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/...
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.qom.QueryObjectModelFactory;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * This event handler distributes events across an application cluster.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component(label="%dist.events.name",
+ description="%dist.events.description",
+ immediate=true,
+ metatype=true,
+ policy=ConfigurationPolicy.REQUIRE)
+@Service(value=Runnable.class)
+@Properties({
+ @Property(name="event.topics",value="*",propertyPrivate=true),
+ @Property(name="event.filter",value="(event.distribute=*)",propertyPrivate=true),
+ @Property(name="repository.path",value="/var/eventing/distribution",propertyPrivate=true),
+ @Property(name="scheduler.period", longValue=1800),
+ @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
+})
+public class DistributingEventHandler
+ extends AbstractRepositoryEventHandler
+ implements Runnable {
+
+ /** Default clean up time is 15 minutes. */
+ private static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+ @Property(intValue=DEFAULT_CLEANUP_PERIOD)
+ private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+ /** We remove everything which is older than 15min by default. */
+ private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#activate(org.osgi.service.component.ComponentContext)
+ */
+ protected void activate(ComponentContext context) {
+ @SuppressWarnings("unchecked")
+ final Dictionary<String, Object> props = context.getProperties();
+ this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+ super.activate(context);
+ }
+
+ /**
+ * Return the query for the clean up.
+ */
+ protected Query getCleanUpQuery(final Session s)
+ throws RepositoryException {
+ final String selectorName = "nodetype";
+ final Calendar deleteBefore = Calendar.getInstance();
+ deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
+
+ final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory();
+
+ final Query q = qomf.createQuery(
+ qomf.selector(getEventNodeType(), selectorName),
+ qomf.and(qomf.descendantNode(selectorName, this.repositoryPath),
+ qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED),
+ QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN,
+ qomf.literal(s.getValueFactory().createValue(deleteBefore)))),
+ null,
+ null
+ );
+ return q;
+ }
+
+ /**
+ * This method is invoked periodically.
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ if ( this.cleanupPeriod > 0 ) {
+ this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+ // we create an own session for concurrency issues
+ Session s = null;
+ try {
+ s = this.environment.createAdminSession();
+ final Query q = this.getCleanUpQuery(s);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Executing query {}", q.getStatement());
+ }
+ final NodeIterator iter = q.execute().getNodes();
+ int count = 0;
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ eventNode.remove();
+ count++;
+ }
+ s.save();
+ logger.debug("Removed {} entries from the repository.", count);
+
+ } catch (RepositoryException e) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during repository cleanup.", e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+ */
+ protected void processWriteQueue() {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( event != null && this.running ) {
+ try {
+ this.writeEvent(event, null);
+ } catch (Exception e) {
+ this.logger.error("Exception during writing the event to the repository.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+ */
+ protected void runInBackground() {
+ while ( this.running ) {
+ // so let's wait/get the next job from the queue
+ EventInfo info = null;
+ try {
+ info = this.queue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( info != null && this.running ) {
+ if ( info.nodePath != null) {
+ Session session = null;
+ try {
+ session = this.environment.createAdminSession();
+ final Node eventNode = (Node)session.getItem(info.nodePath);
+ if ( eventNode.isNodeType(this.getEventNodeType()) ) {
+ final EventAdmin localEA = this.environment.getEventAdmin();
+ if ( localEA != null ) {
+ localEA.postEvent(this.readEvent(eventNode));
+ } else {
+ this.logger.error("Unable to post event as no event admin is available.");
+ }
+ }
+ } catch (Exception ex) {
+ this.logger.error("Exception during reading the event from the repository.", ex);
+ } finally {
+ if ( session != null ) {
+ session.logout();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ public void handleEvent(final Event event) {
+ try {
+ this.writeQueue.put(event);
+ } catch (InterruptedException ex) {
+ // we ignore this
+ this.ignoreException(ex);
+ }
+ }
+
+ /**
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ public void onEvent(final EventIterator iterator) {
+ while ( iterator.hasNext() ) {
+ final javax.jcr.observation.Event event = iterator.nextEvent();
+ try {
+ final EventInfo info = new EventInfo();
+ info.nodePath = event.getPath();
+ this.queue.put(info);
+ } catch (InterruptedException ex) {
+ // we ignore this
+ this.ignoreException(ex);
+ } catch (RepositoryException ex) {
+ this.logger.error("Exception during reading the event from the repository.", ex);
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary)
+ */
+ protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
+ throws RepositoryException {
+ super.addEventProperties(eventNode, properties);
+ properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());
+ }
+
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+ */
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
+ this.writerSession.getWorkspace().getObservationManager()
+ .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * Environment component. This component provides "global settings"
+ * to all services, like the application id and the thread pool.
+ * @since 3.0
+ */
+@Component()
+@Service(value=EnvironmentComponent.class)
+public class EnvironmentComponent {
+
+ @Reference
+ private SlingRepository repository;
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ @Reference(policy=ReferencePolicy.DYNAMIC,cardinality=ReferenceCardinality.OPTIONAL_UNARY)
+ private DynamicClassLoaderManager classLoaderManager;
+
+ /**
+ * Our thread pool.
+ */
+ @Reference(referenceInterface=EventingThreadPool.class)
+ private ThreadPool threadPool;
+
+ /** Sling settings service. */
+ @Reference
+ private SlingSettingsService settingsService;
+
+ /**
+ * Activate this component.
+ */
+ @Activate
+ protected void activate() {
+ // Set the application id and the thread pool
+ Environment.APPLICATION_ID = this.settingsService.getSlingId();
+ Environment.THREAD_POOL = this.threadPool;
+ }
+
+ /**
+ * Dectivate this component.
+ */
+ @Deactivate
+ protected void deactivate() {
+ // Unset the thread pool
+ if ( Environment.THREAD_POOL == this.threadPool ) {
+ Environment.THREAD_POOL = null;
+ }
+ }
+
+ /**
+ * Return the dynamic classloader for loading events from the repository.
+ */
+ public ClassLoader getDynamicClassLoader() {
+ final DynamicClassLoaderManager dclm = this.classLoaderManager;
+ if ( dclm != null ) {
+ return dclm.getDynamicClassLoader();
+ }
+ // if we don't have a dynamic classloader, we return our classloader
+ return this.getClass().getClassLoader();
+ }
+
+ /**
+ * Create a new admin session.
+ * @return A new admin session.
+ * @throws RepositoryException
+ */
+ public Session createAdminSession()
+ throws RepositoryException {
+ final SlingRepository repo = this.repository;
+ if ( repo == null ) {
+ // as the repo is a hard dependency for this service, the repo
+ // is always available, but we check for null anyway!
+ throw new RepositoryException("Repository is currently not available.");
+ }
+ return repo.loginAdministrative(null);
+ }
+
+ /**
+ * Return the event admin.
+ */
+ public EventAdmin getEventAdmin() {
+ return this.eventAdmin;
+ }
+
+ protected void bindThreadPool(final EventingThreadPool etp) {
+ this.threadPool = etp;
+ }
+
+ protected void unbindThreadPool(final EventingThreadPool etp) {
+ if ( this.threadPool == etp ) {
+ this.threadPool = null;
+ }
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPoolConfig.ThreadPriority;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.osgi.service.component.ComponentContext;
+
+
+/**
+ * The configurable eventing thread pool.
+ */
+@Component(label="%event.pool.name",
+ description="%event.pool.description",
+ metatype=true)
+@Service(value=EventingThreadPool.class)
+public class EventingThreadPool implements ThreadPool {
+
+ @Reference
+ protected ThreadPoolManager threadPoolManager;
+
+ /** The real thread pool used. */
+ private org.apache.sling.commons.threads.ThreadPool threadPool;
+
+ private static final int DEFAULT_MIN_POOL_SIZE = 35; // this is sufficient for all threads + approx 25 job queues
+ private static final int DEFAULT_MAX_POOL_SIZE = 50;
+
+ @Property(intValue=DEFAULT_MIN_POOL_SIZE)
+ private static final String PROPERTY_MIN_POOL_SIZE = "minPoolSize";
+ @Property(intValue=DEFAULT_MAX_POOL_SIZE)
+ private static final String PROPERTY_MAX_POOL_SIZE = "maxPoolSize";
+
+ @Property(value="NORM",
+ options={@PropertyOption(name="NORM",value="Norm"),
+ @PropertyOption(name="MIN",value="Min"),
+ @PropertyOption(name="MAX",value="Max")})
+ private static final String PROPERTY_PRIORITY = "priority";
+
+ /**
+ * Activate this component.
+ * @param context
+ */
+ protected void activate(final ComponentContext ctx) {
+ final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig();
+ config.setMinPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MIN_POOL_SIZE), DEFAULT_MIN_POOL_SIZE));
+ config.setMaxPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MAX_POOL_SIZE), DEFAULT_MAX_POOL_SIZE));
+ config.setQueueSize(-1); // unlimited
+ config.setShutdownGraceful(true);
+ config.setPriority(ThreadPriority.valueOf(OsgiUtil.toString(ctx.getProperties().get(PROPERTY_PRIORITY), "NORM")));
+ config.setDaemon(true);
+ this.threadPool = threadPoolManager.create(config, "Apache Sling Eventing Thread Pool");
+ }
+
+ /**
+ * Deactivate this component.
+ * @param context
+ */
+ protected void deactivate(final ComponentContext context) {
+ this.threadPoolManager.release(this.threadPool);
+ }
+
+ /**
+ * @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable)
+ */
+ public void execute(Runnable runnable) {
+ threadPool.execute(runnable);
+ }
+
+ /**
+ * @see org.apache.sling.commons.threads.ThreadPool#getConfiguration()
+ */
+ public ThreadPoolConfig getConfiguration() {
+ return threadPool.getConfiguration();
+ }
+
+ /**
+ * @see org.apache.sling.commons.threads.ThreadPool#getName()
+ */
+ public String getName() {
+ return threadPool.getName();
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,809 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import javax.jcr.Item;
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.Value;
+import javax.jcr.lock.Lock;
+import javax.jcr.lock.LockException;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.qom.Comparison;
+import javax.jcr.query.qom.Constraint;
+import javax.jcr.query.qom.QueryObjectModelFactory;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.scheduler.Job;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.TimedEventStatusProvider;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+
+/**
+ * An event handler for timed events.
+ *
+ */
+@Component(immediate=true)
+@Service(value=TimedEventStatusProvider.class)
+@Properties({
+ @Property(name="event.topics",propertyPrivate=true,
+ value={"org/osgi/framework/BundleEvent/UPDATED",
+ "org/osgi/framework/BundleEvent/STARTED",
+ EventUtil.TOPIC_TIMED_EVENT}),
+ @Property(name="repository.path",value="/var/eventing/timed-jobs",propertyPrivate=true)
+})
+public class TimedJobHandler
+ extends AbstractRepositoryEventHandler
+ implements Job, TimedEventStatusProvider {
+
+ private static final String JOB_TOPIC = "topic";
+
+ private static final String JOB_CONFIG = "config";
+
+ private static final String JOB_SCHEDULE_INFO = "info";
+
+ @Reference
+ private Scheduler scheduler;
+
+ /** Unloaded events. */
+ private Set<String>unloadedEvents = new HashSet<String>();
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession()
+ */
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
+ // load timed events from repository
+ this.loadEvents();
+ this.writerSession.getWorkspace().getObservationManager()
+ .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED|javax.jcr.observation.Event.PROPERTY_REMOVED, this.repositoryPath, true, null, null, true);
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
+ */
+ protected void processWriteQueue() {
+ while ( this.running ) {
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( this.running && event != null ) {
+ ScheduleInfo scheduleInfo = null;
+ try {
+ scheduleInfo = new ScheduleInfo(event);
+ } catch (IllegalArgumentException iae) {
+ this.logger.error(iae.getMessage());
+ }
+ if ( scheduleInfo != null ) {
+ final EventInfo info = new EventInfo();
+ info.event = event;
+
+ // write event and update path
+ // if something went wrong we get the node path and reschedule
+ synchronized ( this.writeLock ) {
+ info.nodePath = this.persistEvent(info.event, scheduleInfo);
+ }
+ if ( info.nodePath != null ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen, so we ignore it
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+ */
+ protected void runInBackground() {
+ while ( this.running ) {
+ // so let's wait/get the next info from the queue
+ EventInfo info = null;
+ try {
+ info = this.queue.take();
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( info != null && this.running ) {
+ synchronized ( this.writeLock ) {
+ ScheduleInfo scheduleInfo = null;
+ try {
+ scheduleInfo = new ScheduleInfo(info.event);
+ } catch (IllegalArgumentException iae) {
+ this.logger.error(iae.getMessage());
+ }
+ if ( scheduleInfo != null ) {
+ try {
+ this.writerSession.refresh(true);
+ if ( this.writerSession.itemExists(info.nodePath) ) {
+ final Node eventNode = (Node) this.writerSession.getItem(info.nodePath);
+ if ( !eventNode.isLocked() ) {
+ // lock node
+ Lock lock = null;
+ try {
+ lock = eventNode.getSession().getWorkspace().getLockManager().lock(info.nodePath, false, true, Long.MAX_VALUE, "TimedJobHandler");
+ } catch (RepositoryException re) {
+ // lock failed which means that the node is locked by someone else, so we don't have to requeue
+ }
+ if ( lock != null ) {
+ // if something went wrong, we reschedule
+ if ( !this.processEvent(info.event, scheduleInfo) ) {
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen, so we ignore it
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ } catch (RepositoryException e) {
+ // ignore
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
+ try {
+ // get parent node
+ final Node parentNode = this.getWriterRootNode();
+ final String nodeName = scheduleInfo.jobId;
+ // is there already a node?
+ final Node foundNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ Lock lock = null;
+ if ( scheduleInfo.isStopEvent() ) {
+ // if this is a stop event, we should remove the node from the repository
+ // if there is no node someone else was faster and we can ignore this
+ if ( foundNode != null ) {
+ try {
+ foundNode.remove();
+ writerSession.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
+ }
+ }
+ // stop the scheduler
+ processEvent(event, scheduleInfo);
+ } else {
+ // if there is already a node, it means we must handle an update
+ if ( foundNode != null ) {
+ try {
+ foundNode.remove();
+ writerSession.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
+ }
+ // create a stop event
+ processEvent(event, scheduleInfo.getStopInfo());
+ }
+ // we only write the event if this is a local one
+ if ( EventUtil.isLocal(event) ) {
+
+ // write event to repository, lock it and schedule the event
+ final Node eventNode = writeEvent(event, nodeName);
+ lock = eventNode.getSession().getWorkspace().getLockManager().lock(eventNode.getPath(), false, true, Long.MAX_VALUE, "TimedJobHandler");
+ }
+ }
+
+ if ( lock != null ) {
+ // if something went wrong, we reschedule
+ if ( !this.processEvent(event, scheduleInfo) ) {
+ final String path = lock.getNode().getPath();
+ writerSession.getWorkspace().getLockManager().unlock(path);
+ return path;
+ }
+ }
+ } catch (RepositoryException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during writing new job to repository.", re);
+ }
+ return null;
+ }
+
+ /**
+ * Process the event.
+ * If a scheduler is available, a job is scheduled or stopped.
+ * @param event The incomming event.
+ * @return
+ */
+ protected boolean processEvent(final Event event, final ScheduleInfo scheduleInfo) {
+ final Scheduler localScheduler = this.scheduler;
+ if ( localScheduler != null ) {
+ // is this a stop event?
+ if ( scheduleInfo.isStopEvent() ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Stopping timed event " + event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC) + "(" + scheduleInfo.jobId + ")");
+ }
+ try {
+ localScheduler.removeJob(scheduleInfo.jobId);
+ } catch (NoSuchElementException nsee) {
+ // this can happen if the job is scheduled on another node
+ // so we can just ignore this
+ }
+ return true;
+ }
+ // we ignore remote job events
+ if ( !EventUtil.isLocal(event) ) {
+ return true;
+ }
+
+ // Create configuration for scheduled job
+ final Map<String, Serializable> config = new HashMap<String, Serializable>();
+ // copy properties
+ final Hashtable<String, Object> properties = new Hashtable<String, Object>();
+ config.put(JOB_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+ final String[] names = event.getPropertyNames();
+ if ( names != null ) {
+ for(int i=0; i<names.length; i++) {
+ properties.put(names[i], event.getProperty(names[i]));
+ }
+ }
+ config.put(JOB_CONFIG, properties);
+ config.put(JOB_SCHEDULE_INFO, scheduleInfo);
+
+ try {
+ if ( scheduleInfo.expression != null ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with cron expression " + scheduleInfo.expression);
+ }
+ localScheduler.addJob(scheduleInfo.jobId, this, config, scheduleInfo.expression, false);
+ } else if ( scheduleInfo.period != null ) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with period " + scheduleInfo.period);
+ }
+ localScheduler.addPeriodicJob(scheduleInfo.jobId, this, config, scheduleInfo.period, false);
+ } else {
+ // then it must be date
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Adding timed event " + config.get(JOB_TOPIC) + "(" + scheduleInfo.jobId + ")" + " with date " + scheduleInfo.date);
+ }
+ localScheduler.fireJobAt(scheduleInfo.jobId, this, config, scheduleInfo.date);
+ }
+ return true;
+ } catch (Exception e) {
+ this.ignoreException(e);
+ }
+ } else {
+ this.logger.error("No scheduler available to start timed event " + event);
+ }
+ return false;
+ }
+
+ /**
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ public void onEvent(EventIterator iter) {
+ // we create an own session here
+ Session s = null;
+ try {
+ while ( iter.hasNext() ) {
+ final javax.jcr.observation.Event event = iter.nextEvent();
+ if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
+ || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+
+ final String propPath = event.getPath();
+ int pos = propPath.lastIndexOf('/');
+ final String nodePath = propPath.substring(0, pos);
+ final String propertyName = propPath.substring(pos+1);
+ // we are only interested in unlocks
+ if ( "jcr:lockOwner".equals(propertyName) ) {
+ try {
+ if ( s == null ) {
+ s = this.environment.createAdminSession();
+ }
+ final Node eventNode = (Node) s.getItem(nodePath);
+ if ( !eventNode.isLocked() ) {
+ try {
+ final EventInfo info = new EventInfo();
+ info.event = this.readEvent(eventNode);
+ info.nodePath =nodePath;
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ this.ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // add it to the unloaded set
+ synchronized (unloadedEvents) {
+ this.unloadedEvents.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during jcr event processing.", re);
+ }
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create a session.", re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ public void handleEvent(Event event) {
+ if ( event.getTopic().equals(EventUtil.TOPIC_TIMED_EVENT) ) {
+ // queue the event in order to respond quickly
+ try {
+ this.writeQueue.put(event);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ } else {
+ // bundle event started or updated
+ boolean doIt = false;
+ synchronized ( this.unloadedEvents ) {
+ if ( this.unloadedEvents.size() > 0 ) {
+ doIt = true;
+ }
+ }
+ if ( doIt ) {
+ final Runnable t = new Runnable() {
+
+ public void run() {
+ synchronized (unloadedEvents) {
+ Session s = null;
+ final Set<String> newUnloadedEvents = new HashSet<String>();
+ newUnloadedEvents.addAll(unloadedEvents);
+ try {
+ s = environment.createAdminSession();
+ for(String path : unloadedEvents ) {
+ newUnloadedEvents.remove(path);
+ try {
+ if ( s.itemExists(path) ) {
+ final Node eventNode = (Node) s.getItem(path);
+ if ( !eventNode.isLocked() ) {
+ try {
+ final EventInfo info = new EventInfo();
+ info.event = readEvent(eventNode);
+ info.nodePath = path;
+ try {
+ queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ newUnloadedEvents.add(path);
+ ignoreException(cnfe);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ // we ignore this and readd
+ newUnloadedEvents.add(path);
+ ignoreException(re);
+ }
+ }
+ } catch (RepositoryException re) {
+ // unable to create session, so we try it again next time
+ ignoreException(re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ unloadedEvents.clear();
+ unloadedEvents.addAll(newUnloadedEvents);
+ }
+ }
+ }
+
+ };
+ Environment.THREAD_POOL.execute(t);
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+ */
+ public void execute(JobContext context) {
+ final String topic = (String) context.getConfiguration().get(JOB_TOPIC);
+ @SuppressWarnings("unchecked")
+ final Dictionary<Object, Object> properties = (Dictionary<Object, Object>) context.getConfiguration().get(JOB_CONFIG);
+ final EventAdmin ea = this.environment.getEventAdmin();
+ if ( ea != null ) {
+ try {
+ ea.postEvent(new Event(topic, properties));
+ } catch (IllegalArgumentException iae) {
+ this.logger.error("Scheduled event has illegal topic: " + topic, iae);
+ }
+ } else {
+ this.logger.warn("Unable to send timed event as no event admin service is available.");
+ }
+ final ScheduleInfo info = (ScheduleInfo) context.getConfiguration().get(JOB_SCHEDULE_INFO);
+ // is this job scheduled for a specific date?
+ if ( info.date != null ) {
+ // we can remove it from the repository
+ // we create an own session here
+ Session s = null;
+ try {
+ s = this.environment.createAdminSession();
+ if ( s.itemExists(this.repositoryPath) ) {
+ final Node parentNode = (Node)s.getItem(this.repositoryPath);
+ final String nodeName = info.jobId;
+ final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ if ( eventNode != null ) {
+ try {
+ eventNode.remove();
+ s.save();
+ } catch (RepositoryException re) {
+ // we ignore the exception if removing fails
+ ignoreException(re);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create a session.", re);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ }
+ }
+
+ /**
+ * Load all active timed events from the repository.
+ * @throws RepositoryException
+ */
+ protected void loadEvents() {
+ try {
+ final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager();
+ final String selectorName = "nodetype";
+
+ final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+ final Query q = qomf.createQuery(
+ qomf.selector(getEventNodeType(), selectorName),
+ qomf.descendantNode(selectorName, this.repositoryPath),
+ null,
+ null
+ );
+ final NodeIterator result = q.execute().getNodes();
+ while ( result.hasNext() ) {
+ final Node eventNode = result.nextNode();
+ if ( !eventNode.isLocked() ) {
+ final String nodePath = eventNode.getPath();
+ try {
+ final Event event = this.readEvent(eventNode);
+ final EventInfo info = new EventInfo();
+ info.event = event;
+ info.nodePath = nodePath;
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ this.ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // add it to the unloaded set
+ synchronized (unloadedEvents) {
+ this.unloadedEvents.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ } catch (RepositoryException re) {
+ // if reading an event fails, we ignore this
+ this.ignoreException(re);
+ }
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during initial loading of stored timed events.", re);
+ }
+ }
+
+ /**
+ * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+ */
+ protected void addNodeProperties(Node eventNode, Event event)
+ throws RepositoryException {
+ super.addNodeProperties(eventNode, event);
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+ final ScheduleInfo info = new ScheduleInfo(event);
+ if ( info.date != null ) {
+ final Calendar c = Calendar.getInstance();
+ c.setTime(info.date);
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_DATE, c);
+ }
+ if ( info.expression != null ) {
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_EXPRESSION, info.expression);
+ }
+ if ( info.period != null ) {
+ eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_PERIOD, info.period.longValue());
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getEventNodeType()
+ */
+ protected String getEventNodeType() {
+ return JCRHelper.TIMED_EVENT_NODE_TYPE;
+ }
+
+ protected static final class ScheduleInfo implements Serializable {
+
+ private static final long serialVersionUID = 8667701700547811142L;
+
+ public final String expression;
+ public final Long period;
+ public final Date date;
+ public final String jobId;
+
+ public ScheduleInfo(final Event event)
+ throws IllegalArgumentException {
+ // let's see if a schedule information is specified or if the job should be stopped
+ this.expression = (String) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE);
+ this.period = (Long) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_PERIOD);
+ this.date = (Date) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_DATE);
+ int count = 0;
+ if ( this.expression != null) {
+ count++;
+ }
+ if ( this.period != null ) {
+ count++;
+ }
+ if ( this.date != null ) {
+ count++;
+ }
+ if ( count > 1 ) {
+ throw new IllegalArgumentException("Only one configuration property from " + EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE +
+ ", " + EventUtil.PROPERTY_TIMED_EVENT_PERIOD +
+ ", or " + EventUtil.PROPERTY_TIMED_EVENT_DATE + " should be used.");
+ }
+ // we create a job id consisting of the real event topic and an (optional) id
+ // if the event contains a timed event id or a job id we'll append that to the name
+ String topic = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+ if ( topic == null ) {
+ throw new IllegalArgumentException("Timed event does not contain required property " + EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+ }
+ String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
+ String jId = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME);
+
+ //this.jobId = getJobId(topic, id, jId);
+ this.jobId = getJobId(topic, id, jId);
+ }
+
+ private ScheduleInfo(String jobId) {
+ this.expression = null;
+ this.period = null;
+ this.date = null;
+ this.jobId = jobId;
+ }
+
+ public ScheduleInfo getStopInfo() {
+ return new ScheduleInfo(this.jobId);
+ }
+
+ public boolean isStopEvent() {
+ return this.expression == null && this.period == null && this.date == null;
+ }
+
+ public static String getJobId(String topic, String timedEventId, String jobId) {
+ return topic.replace('/', '.') + "/TimedEvent " + (timedEventId != null ? Utility.filter(timedEventId) : "") + '_' + (jobId != null ? Utility.filter(jobId) : "");
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvent(java.lang.String, java.lang.String, java.lang.String)
+ */
+ public Event getScheduledEvent(String topic, String eventId, String jobId) {
+ Session s = null;
+ try {
+ s = this.environment.createAdminSession();
+ if ( s.itemExists(this.repositoryPath) ) {
+ final Node parentNode = (Node)s.getItem(this.repositoryPath);
+ final String nodeName = ScheduleInfo.getJobId(topic, eventId, jobId);
+ final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null;
+ if ( eventNode != null ) {
+ return this.readEvent(eventNode);
+ }
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to create a session.", re);
+ } catch (ClassNotFoundException e) {
+ this.ignoreException(e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvents(java.lang.String, java.util.Map...)
+ */
+ public Collection<Event> getScheduledEvents(String topic, Map<String, Object>... filterProps) {
+ // we create a new session
+ Session s = null;
+ final List<Event> jobs = new ArrayList<Event>();
+ try {
+ s = this.environment.createAdminSession();
+ final QueryManager qManager = s.getWorkspace().getQueryManager();
+ final String selectorName = "nodetype";
+
+ final QueryObjectModelFactory qomf = qManager.getQOMFactory();
+
+ final String path;
+ if ( topic == null ) {
+ path = this.repositoryPath;
+ } else {
+ path = this.repositoryPath + '/' + topic.replace('/', '.');
+ }
+ Constraint constraint = qomf.descendantNode(selectorName, path);
+ if ( filterProps != null && filterProps.length > 0 ) {
+ Constraint orConstraint = null;
+ for (Map<String,Object> template : filterProps) {
+ Constraint comp = null;
+ final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, Object> current = i.next();
+ // check prop name first
+ final String propName = JCRHelper.getNodePropertyName(current.getKey());
+ if ( propName != null ) {
+ // check value
+ final Value value = JCRHelper.getNodePropertyValue(s.getValueFactory(), current.getValue());
+ if ( value != null ) {
+ final Comparison newComp = qomf.comparison(qomf.propertyValue(selectorName, propName),
+ QueryObjectModelFactory.JCR_OPERATOR_EQUAL_TO,
+ qomf.literal(value));
+ if ( comp == null ) {
+ comp = newComp;
+ } else {
+ comp = qomf.and(comp, newComp);
+ }
+ }
+ }
+ }
+ if ( comp != null ) {
+ if ( orConstraint == null ) {
+ orConstraint = comp;
+ } else {
+ orConstraint = qomf.or(orConstraint, comp);
+ }
+ }
+ }
+ if ( orConstraint != null ) {
+ constraint = qomf.and(constraint, orConstraint);
+ }
+ }
+ final Query q = qomf.createQuery(
+ qomf.selector(getEventNodeType(), selectorName),
+ constraint,
+ null,
+ null
+ );
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Executing job query {}.", q.getStatement());
+ }
+
+ final NodeIterator iter = q.execute().getNodes();
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ try {
+ final Event event = this.readEvent(eventNode);
+ jobs.add(event);
+ } catch (ClassNotFoundException cnfe) {
+ // in the case of a class not found exception we just ignore the exception
+ this.ignoreException(cnfe);
+ }
+ }
+ } catch (RepositoryException e) {
+ // in the case of an error, we return an empty list
+ this.ignoreException(e);
+ } finally {
+ if ( s != null) {
+ s.logout();
+ }
+ }
+ return jobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.TimedEventStatusProvider#cancelTimedEvent(java.lang.String)
+ */
+ public void cancelTimedEvent(String jobId) {
+ synchronized ( this.writeLock ) {
+ try {
+ // is there a node?
+ final Item foundNode = this.writerSession.itemExists(jobId) ? this.writerSession.getItem(jobId) : null;
+ // we should remove the node from the repository
+ // if there is no node someone else was faster and we can ignore this
+ if ( foundNode != null ) {
+ try {
+ foundNode.remove();
+ writerSession.save();
+ } catch (LockException le) {
+ // if someone else has the lock this is fine
+ }
+ }
+ } catch ( RepositoryException re) {
+ this.logger.error("Unable to cancel timed event: " + jobId, re);
+ }
+ // stop the scheduler
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Stopping timed event " + jobId);
+ }
+ final Scheduler localScheduler = this.scheduler;
+ if ( localScheduler != null ) {
+ try {
+ localScheduler.removeJob(jobId);
+ } catch (NoSuchElementException nsee) {
+ // this can happen if the job is scheduled on another node
+ // so we can just ignore this
+ }
+ }
+ }
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
+import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue;
+import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue;
+import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.JobsIterator;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.Statistics;
+import org.osgi.service.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An event handler for special job events.
+ *
+ * We schedule this event handler to run in the background
+ * and clean up obsolete queues.
+ *
+ */
+@Component(label="%job.events.name",
+ description="%job.events.description",
+ metatype=true,immediate=true)
+@Services({
+ @Service(value=Runnable.class),
+ @Service(value=JobManager.class)
+})
+@Properties({
+ @Property(name="scheduler.period", longValue=300,
+ label="%jobscheduler.period.name",
+ description="%jobscheduler.period.description"),
+ @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
+ @Property(name=ConfigurationConstants.PROP_PRIORITY,
+ value=ConfigurationConstants.DEFAULT_PRIORITY,
+ options={@PropertyOption(name="NORM",value="Norm"),
+ @PropertyOption(name="MIN",value="Min"),
+ @PropertyOption(name="MAX",value="Max")}),
+ @Property(name=ConfigurationConstants.PROP_RETRIES,
+ intValue=ConfigurationConstants.DEFAULT_RETRIES),
+ @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
+ longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+ @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
+ intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL)
+})
+public class DefaultJobManager
+ implements Runnable, JobManager {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** The environment component. */
+ @Reference
+ private EnvironmentComponent environment;
+
+ /** The configuration manager. */
+ @Reference
+ private QueueConfigurationManager configManager;
+
+ /** The scheduler service. */
+ @Reference
+ private Scheduler scheduler;
+
+ /** All active queues. */
+ private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+
+ /** Main configuration. */
+ private InternalQueueConfiguration mainConfiguration;
+
+ /** Base statistics. */
+ private final StatisticsImpl baseStatistics = new StatisticsImpl();
+
+ /** Current statistics. */
+ private StatisticsImpl currentStatistics;
+
+ /** Last update for current statistics. */
+ private long lastUpdatedStatistics;
+
+ /** All existing events. */
+ private final Map<String, JobEvent> allEvents = new HashMap<String, JobEvent>();
+
+ /** All existing events by topic. */
+ private final Map<String, List<JobEvent>> allEventsByTopic = new HashMap<String, List<JobEvent>>();
+
+ /**
+ * Activate this component.
+ * @param props Configuration properties
+ */
+ @Activate
+ protected void activate(final Map<String, Object> props) {
+ this.update(props);
+ logger.info("Apache Sling Job Event Handler started on instance {}", Environment.APPLICATION_ID);
+ }
+
+ /**
+ * Configure this component.
+ * @param props Configuration properties
+ */
+ @Modified
+ protected void update(final Map<String, Object> props) {
+ // create a new dictionary with the missing info and do some sanety puts
+ final Map<String, Object> queueProps = new HashMap<String, Object>(props);
+ queueProps.remove(ConfigurationConstants.PROP_APP_IDS);
+ queueProps.put(ConfigurationConstants.PROP_TOPICS, "*");
+ queueProps.put(ConfigurationConstants.PROP_NAME, "<main queue>");
+ queueProps.put(ConfigurationConstants.PROP_RUN_LOCAL, false);
+ queueProps.put(ConfigurationConstants.PROP_TYPE, InternalQueueConfiguration.Type.UNORDERED);
+
+ // check max parallel - this should never be lower than 2!
+ final int maxParallel = OsgiUtil.toInteger(queueProps.get(ConfigurationConstants.PROP_MAX_PARALLEL),
+ ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+ if ( maxParallel < 2 ) {
+ this.logger.debug("Ignoring invalid setting of {} for {}. Setting to minimum value: 2",
+ maxParallel, ConfigurationConstants.PROP_MAX_PARALLEL);
+ queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 2);
+ }
+ this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
+ }
+
+ /**
+ * Dectivate this component.
+ */
+ @Deactivate
+ protected void deactivate() {
+ final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
+ while ( i.hasNext() ) {
+ final AbstractJobQueue jbq = i.next();
+ jbq.close();
+ }
+ this.queues.clear();
+ logger.info("Apache Sling Job Event Handler stopped on instance {}", Environment.APPLICATION_ID);
+ }
+
+ /**
+ * This method is invoked periodically by the scheduler.
+ * It searches for idle queues and stops them after a timeout. If a queue
+ * is idle for two consecutive clean up calls, it is removed.
+ * @see java.lang.Runnable#run()
+ */
+ public void cleanup() {
+ // check for idle queue
+ // we synchronize to avoid creating a queue which is about to be removed during cleanup
+ synchronized ( this ) {
+ final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, AbstractJobQueue> current = i.next();
+ final AbstractJobQueue jbq = current.getValue();
+ if ( jbq.isMarkedForCleanUp() ) {
+ // close
+ jbq.close();
+ // copy statistics
+ this.baseStatistics.add(jbq);
+ // remove
+ i.remove();
+ } else {
+ // mark to be removed during next cycle
+ jbq.markForCleanUp();
+ }
+ }
+ }
+ }
+
+ /**
+ * Process a new job event.
+ * This method first searches the corresponding queue - if such a queue
+ * does not exist yet, it is created and started.
+ * @param event The job event
+ */
+ public void process(final JobEvent event) {
+ // get the queue configuration
+ InternalQueueConfiguration config = configManager.getQueueConfiguration(event);
+
+ // if no queue config is found, we either create a new queue or use the main queue
+ if ( config == null ) {
+ final String customQueueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
+ if ( customQueueName != null ) {
+ synchronized ( this ) {
+ final AbstractJobQueue queue = this.queues.get(customQueueName);
+ if ( queue != null ) {
+ config = queue.getConfiguration();
+ } else {
+ config = new InternalQueueConfiguration(event.event);
+ }
+ event.queueName = customQueueName;
+ }
+ } else {
+ config = this.mainConfiguration;
+ event.queueName = this.mainConfiguration.getName();
+ }
+ }
+
+ // get the queue name
+ final String queueName = event.queueName;
+
+ if ( config.isSkipped(event) ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Ignoring event due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event));
+ }
+ return;
+ }
+
+ // get or create queue
+ AbstractJobQueue queue = null;
+ // we synchronize to avoid creating a queue which is about to be removed during cleanup
+ synchronized ( this ) {
+ queue = this.queues.get(queueName);
+ // check for reconfiguration, we really do an identity check here(!)
+ if ( queue != null && queue.getConfiguration() != config ) {
+ // remove the queue with the old name
+ this.queues.remove(queueName);
+ // check if we can close or have to rename
+ queue.markForCleanUp();
+ if ( queue.isMarkedForCleanUp() ) {
+ // close
+ queue.close();
+ // copy statistics
+ this.baseStatistics.add(queue);
+ } else {
+ // notify queue
+ queue.rename(queueName + "<outdated>");
+ // readd with new name
+ this.queues.put(queue.getName(), queue);
+ }
+ // we use a new queue with the configuration
+ queue = null;
+ }
+ if ( queue == null ) {
+ if ( config.getType() == InternalQueueConfiguration.Type.ORDERED ) {
+ queue = new OrderedJobQueue(queueName, config, this.environment);
+ } else if ( config.getType() == InternalQueueConfiguration.Type.UNORDERED ) {
+ queue = new ParallelJobQueue(queueName, config, this.environment, this.scheduler);
+ } else if ( config.getType() == InternalQueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
+ queue = new TopicRoundRobinJobQueue(queueName, config, this.environment, this.scheduler);
+ }
+ if ( queue == null ) {
+ // this is just a sanety check, actually we can never get here
+ logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueName, EventUtil.toString(event.event));
+ return;
+ }
+ queues.put(queueName, queue);
+ queue.start();
+ }
+ }
+
+ // and put event
+ queue.process(event);
+ }
+
+ /**
+ * This method is invoked periodically by the scheduler.
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ this.cleanup();
+ }
+
+ /**
+ * Return our internal statistics object.
+ * We recalculate this every 1.5sec (if requested)
+ *
+ * @see org.apache.sling.event.jobs.JobManager#getStatistics()
+ */
+ public synchronized Statistics getStatistics() {
+ final long now = System.currentTimeMillis();
+ if ( this.currentStatistics == null || this.lastUpdatedStatistics + 1500 < now ) {
+ this.currentStatistics = this.baseStatistics.copy();
+ for(final AbstractJobQueue jq : this.queues.values() ) {
+ this.currentStatistics.add(jq);
+ }
+ }
+ return this.currentStatistics;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String)
+ */
+ public Queue getQueue(final String name) {
+ return this.queues.get(name);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getQueues()
+ */
+ public Iterable<Queue> getQueues() {
+ final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
+ return new Iterable<Queue>() {
+
+ public Iterator<Queue> iterator() {
+ return new Iterator<Queue>() {
+
+ public boolean hasNext() {
+ return jqI.hasNext();
+ }
+
+ public Queue next() {
+ return jqI.next();
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ public InternalQueueConfiguration getMainQueueConfiguration() {
+ return this.mainConfiguration;
+ }
+
+ /**
+ * Add a job to all jobs.
+ */
+ public void notifyAddJob(final JobEvent job) {
+ final String key = job.uniqueId;
+ final String topic = (String)job.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ final JobEvent oldJob;
+ synchronized ( this.allEvents ) {
+ oldJob = this.allEvents.put(key, job);
+ }
+ List<JobEvent> l;
+ synchronized ( this.allEventsByTopic ) {
+ l = this.allEventsByTopic.get(topic);
+ if ( l == null ) {
+ l = new ArrayList<JobEvent>();
+ this.allEventsByTopic.put(topic, l);
+ }
+ }
+ synchronized ( l ) {
+ if ( oldJob != null ) {
+ l.remove(oldJob);
+ }
+ l.add(job);
+ }
+ }
+
+ /**
+ * Remove a job from all jobs.
+ */
+ public void notifyRemoveJob(final String key) {
+ final JobEvent oldJob;
+ synchronized ( this.allEvents ) {
+ oldJob = this.allEvents.remove(key);
+ }
+ if ( oldJob != null ) {
+ final String topic = (String)oldJob.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ final List<JobEvent> l;
+ synchronized ( this.allEventsByTopic ) {
+ l = this.allEventsByTopic.get(topic);
+ }
+ if ( l != null ) {
+ synchronized ( l ) {
+ l.remove(oldJob);
+ }
+ }
+ }
+ }
+
+ /**
+ * Job started
+ */
+ public void notifyActiveJob(final String key) {
+ final JobEvent job;
+ synchronized ( this.allEvents ) {
+ job = this.allEvents.get(key);
+ }
+ if ( job != null ) {
+ job.started = 1;
+ }
+ }
+
+ /**
+ * Check the requested job type
+ */
+ private boolean checkType(final QueryType type, final JobEvent event) {
+ if ( type == QueryType.ALL ) {
+ return true;
+ }
+ if ( type == QueryType.ACTIVE && event.started == 1 ) {
+ return true;
+ }
+ if ( type == QueryType.QUEUED && event.started == -1 ) {
+ return true;
+ }
+ return false;
+ }
+
+ private enum Operation {
+ LESS,
+ LESS_OR_EQUALS,
+ EQUALS,
+ GREATER_OR_EQUALS,
+ GREATER
+ }
+ /**
+ * Check if the job matches the template
+ */
+ private boolean match(final JobEvent job, final Map<String, Object> template) {
+ if ( template != null ) {
+ for(final Map.Entry<String, Object> current : template.entrySet()) {
+ final String key = current.getKey();
+ final char firstChar = key.length() > 0 ? key.charAt(0) : 0;
+ final String propName;
+ final Operation op;
+ if ( firstChar == '=' ) {
+ propName = key.substring(1);
+ op = Operation.EQUALS;
+ } else if ( firstChar == '<' ) {
+ final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+ if ( secondChar == '=' ) {
+ op = Operation.LESS_OR_EQUALS;
+ propName = key.substring(2);
+ } else {
+ op = Operation.LESS;
+ propName = key.substring(1);
+ }
+ } else if ( firstChar == '>' ) {
+ final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+ if ( secondChar == '=' ) {
+ op = Operation.GREATER_OR_EQUALS;
+ propName = key.substring(2);
+ } else {
+ op = Operation.GREATER;
+ propName = key.substring(1);
+ }
+ } else {
+ propName = key;
+ op = Operation.EQUALS;
+ }
+ final Object value = current.getValue();
+
+ if ( op == Operation.EQUALS ) {
+ if ( !value.equals(job.event.getProperty(propName)) ) {
+ return false;
+ }
+ } else {
+ if ( value instanceof Comparable ) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final int result = ((Comparable)value).compareTo(job.event.getProperty(propName));
+ if ( op == Operation.LESS && result != -1 ) {
+ return false;
+ } else if ( op == Operation.LESS_OR_EQUALS && result == 1 ) {
+ return false;
+ } else if ( op == Operation.GREATER_OR_EQUALS && result == -1 ) {
+ return false;
+ } else if ( op == Operation.GREATER && result != 1 ) {
+ return false;
+ }
+ } else {
+ // if the value is not comparable we simply don't match
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ private void queryCollection(
+ final List<Event> result,
+ final QueryType type,
+ final Collection<JobEvent> collection,
+ final Map<String, Object>... filterProps) {
+ synchronized ( collection ) {
+ final Iterator<JobEvent> iter = collection.iterator();
+ while ( iter.hasNext() ) {
+ final JobEvent job = iter.next();
+ boolean add = checkType(type, job);
+ if ( add && filterProps != null && filterProps.length != 0 ) {
+ add = false;
+ for (Map<String,Object> template : filterProps) {
+ add = this.match(job, template);
+ if ( add ) {
+ break;
+ }
+ }
+ }
+ if ( add ) {
+ result.add(job.event);
+ }
+ }
+ }
+ }
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#queryJobs(QueryType, java.lang.String, java.util.Map...)
+ */
+ public JobsIterator queryJobs(final QueryType type,
+ final String topic,
+ final Map<String, Object>... filterProps) {
+ final List<Event> result = new ArrayList<Event>();
+ if ( topic != null ) {
+ final List<JobEvent> l;
+ synchronized ( this.allEventsByTopic ) {
+ l = this.allEventsByTopic.get(topic);
+ }
+ if ( l != null ) {
+ queryCollection(result, type, l, filterProps);
+ }
+ } else {
+ final Set<Collection<JobEvent>> topics;
+ synchronized ( this.allEventsByTopic ) {
+ topics = new HashSet<Collection<JobEvent>>(this.allEventsByTopic.values());
+ }
+ for(final Collection<JobEvent> l : topics) {
+ queryCollection(result, type, l, filterProps);
+ }
+ }
+ return new JobsIteratorImpl(result);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#findJob(java.lang.String, java.util.Map)
+ */
+ public Event findJob(final String topic, final Map<String, Object> template) {
+ Event result = null;
+ if ( topic != null ) {
+ final List<JobEvent> l;
+ synchronized ( this.allEventsByTopic ) {
+ l = this.allEventsByTopic.get(topic);
+ }
+ if ( l != null ) {
+ synchronized ( l ) {
+ final Iterator<JobEvent> iter = l.iterator();
+ while ( result == null && iter.hasNext() ) {
+ final JobEvent job = iter.next();
+ if ( match(job, template) ) {
+ result = job.event;
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#removeJob(java.lang.String)
+ */
+ public boolean removeJob(final String jobId) {
+ final JobEvent job;
+ synchronized ( this.allEvents ) {
+ job = this.allEvents.get(jobId);
+ }
+ boolean result = true;
+ if ( job != null ) {
+ result = job.remove();
+ if ( result ) {
+ this.notifyRemoveJob(jobId);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#forceRemoveJob(java.lang.String)
+ */
+ public void forceRemoveJob(final String jobId) {
+ while ( !this.removeJob(jobId) ) {
+ // instead of using complicated syncs, waits and notifies we simply poll
+ try {
+ Thread.sleep(80);
+ } catch (final InterruptedException ignore) {
+ this.ignoreException(ignore);
+ }
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This object encapsulates all information about a job.
+ */
+public abstract class JobEvent {
+
+ public Event event;
+ public final String uniqueId;
+
+ public String queueName;
+
+ public long queued = -1;
+ public long started = -1;
+
+ public JobEvent(final Event e, final String uniqueId) {
+ this.event = e;
+ this.uniqueId = uniqueId;
+ }
+
+ public abstract boolean lock();
+ public abstract void unlock();
+ public abstract void finished();
+ public abstract boolean reschedule();
+ public abstract boolean remove();
+
+ @Override
+ public int hashCode() {
+ return this.uniqueId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if ( ! (obj instanceof JobEvent) ) {
+ return false;
+ }
+ return this.uniqueId.equals(((JobEvent)obj).uniqueId);
+ }
+}
\ No newline at end of file
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
------------------------------------------------------------------------------
svn:mime-type = text/plain