You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/22 13:42:55 UTC
svn commit: r1470462 [2/7] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/dea/
src/main/java/org/apache/sling/event/impl/jobs/ sr...
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyUnbounded;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.discovery.PropertyProvider;
+import org.apache.sling.event.impl.support.TopicMatcher;
+import org.apache.sling.event.impl.support.TopicMatcherHelper;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * This component manages/keeps track of all job consumer services.
+ */
+@Component(label="%job.consumermanager.name",
+ description="%job.consumermanager.description",
+ metatype=true)
+@Service(value=JobConsumerManager.class)
+@Reference(referenceInterface=JobConsumer.class,
+ cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
+ policy=ReferencePolicy.DYNAMIC)
+@Property(name="org.apache.sling.installer.configuration.persist", boolValue=false, propertyPrivate=true)
+public class JobConsumerManager {
+
+ @Property(unbounded=PropertyUnbounded.ARRAY, value = "*")
+ private static final String PROPERTY_WHITELIST = "job.consumermanager.whitelist";
+
+ @Property(unbounded=PropertyUnbounded.ARRAY)
+ private static final String PROPERTY_BLACKLIST = "job.consumermanager.blacklist";
+
+ /** The map with the consumers, keyed by topic, sorted by service ranking. */
+ private final Map<String, List<ConsumerInfo>> topicToConsumerMap = new HashMap<String, List<ConsumerInfo>>();
+
+ /** Marker if this instance supports bridged events. */
+ private boolean supportsBridgedEvents;
+
+ /** ServiceRegistration for propagation. */
+ private ServiceRegistration propagationService;
+
+ private String topics;
+
+ private TopicMatcher[] whitelistMatchers;
+
+ private TopicMatcher[] blacklistMatchers;
+
+ private volatile long changeCount;
+
+ private Dictionary<String, Object> getRegistrationProperties() {
+ final Dictionary<String, Object> serviceProps = new Hashtable<String, Object>();
+ serviceProps.put(PropertyProvider.PROPERTY_PROPERTIES, JobConsumer.PROPERTY_TOPICS);
+ // we add a changing property to the service registration
+ // to make sure a modification event is really sent
+ synchronized ( this ) {
+ serviceProps.put("changeCount", this.changeCount++);
+ }
+ return serviceProps;
+ }
+
+ @Activate
+ protected void activate(final BundleContext bc, final Map<String, Object> props) {
+ this.modified(bc, props);
+ }
+
+ @Modified
+ protected void modified(final BundleContext bc, final Map<String, Object> props) {
+ final boolean wasEnabled = this.propagationService != null;
+ this.whitelistMatchers = TopicMatcherHelper.buildMatchers(PropertiesUtil.toStringArray(props.get(PROPERTY_WHITELIST)));
+ this.blacklistMatchers = TopicMatcherHelper.buildMatchers(PropertiesUtil.toStringArray(props.get(PROPERTY_BLACKLIST)));
+
+ final boolean enable = this.whitelistMatchers != null && this.blacklistMatchers != TopicMatcherHelper.MATCH_ALL;
+ if ( wasEnabled != enable ) {
+ if ( enable ) {
+ this.propagationService = bc.registerService(PropertyProvider.class.getName(),
+ new PropertyProvider() {
+
+ @Override
+ public String getProperty(final String name) {
+ if ( JobConsumer.PROPERTY_TOPICS.equals(name) ) {
+ return topics;
+ }
+ return null;
+ }
+ }, this.getRegistrationProperties());
+ } else {
+ this.propagationService.unregister();
+ this.propagationService = null;
+ }
+ synchronized ( this.topicToConsumerMap ) {
+ this.calculateTopics();
+ }
+ }
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ if ( this.propagationService != null ) {
+ this.propagationService.unregister();
+ this.propagationService = null;
+ }
+ }
+
+ /**
+ * Get the consumer for the topic.
+ * @param topic The job topic
+ * @return A consumer or <code>null</code>
+ */
+ public JobConsumer getConsumer(final String topic) {
+ synchronized ( this.topicToConsumerMap ) {
+ final List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
+ if ( consumers != null ) {
+ return consumers.get(0).consumer;
+ }
+ final int pos = topic.lastIndexOf('/');
+ if ( pos > 0 ) {
+ final String category = topic.substring(0, pos + 1).concat("*");
+ final List<ConsumerInfo> categoryConsumers = this.topicToConsumerMap.get(category);
+ if ( categoryConsumers != null ) {
+ return categoryConsumers.get(0).consumer;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return the topics information of this instance.
+ */
+ public String getTopics() {
+ return this.topics;
+ }
+
+ /**
+ * Does this instance supports bridged events?
+ */
+ public boolean supportsBridgedEvents() {
+ return supportsBridgedEvents;
+ }
+
+ /**
+ * Bind a new consumer
+ * @param consumer The new consumer.
+ * @param properties The service properties.
+ */
+ protected void bindJobConsumer(final JobConsumer consumer, final Map<String, Object> properties) {
+ final String[] topics = PropertiesUtil.toStringArray(properties.get(JobConsumer.PROPERTY_TOPICS));
+ if ( topics != null && topics.length > 0 ) {
+ final ConsumerInfo info = new ConsumerInfo(consumer, properties);
+ boolean changed = false;
+ synchronized ( this.topicToConsumerMap ) {
+ for(final String t : topics) {
+ if ( t != null ) {
+ final String topic = t.trim();
+ if ( topic.length() > 0 ) {
+ List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
+ if ( consumers == null ) {
+ consumers = new ArrayList<JobConsumerManager.ConsumerInfo>();
+ this.topicToConsumerMap.put(topic, consumers);
+ changed = true;
+ }
+ consumers.add(info);
+ Collections.sort(consumers);
+ }
+ }
+ }
+ this.supportsBridgedEvents = this.topicToConsumerMap.containsKey("/");
+ if ( changed ) {
+ this.calculateTopics();
+ }
+ }
+ if ( changed && this.propagationService != null ) {
+ this.propagationService.setProperties(this.getRegistrationProperties());
+ }
+ }
+ }
+
+ /**
+ * Unbind a consumer
+ * @param consumer The old consumer.
+ * @param properties The service properties.
+ */
+ protected void unbindJobConsumer(final JobConsumer consumer, final Map<String, Object> properties) {
+ final String[] topics = PropertiesUtil.toStringArray(properties.get(JobConsumer.PROPERTY_TOPICS));
+ if ( topics != null && topics.length > 0 ) {
+ final ConsumerInfo info = new ConsumerInfo(consumer, properties);
+ boolean changed = false;
+ synchronized ( this.topicToConsumerMap ) {
+ for(final String t : topics) {
+ if ( t != null ) {
+ final String topic = t.trim();
+ if ( topic.length() > 0 ) {
+ final List<ConsumerInfo> consumers = this.topicToConsumerMap.get(topic);
+ if ( consumers != null ) { // sanity check
+ consumers.remove(info);
+ if ( consumers.size() == 0 ) {
+ this.topicToConsumerMap.remove(topic);
+ changed = true;
+ }
+ }
+ }
+ }
+ }
+ this.supportsBridgedEvents = this.topicToConsumerMap.containsKey("/");
+ if ( changed ) {
+ this.calculateTopics();
+ }
+ }
+ if ( changed && this.propagationService != null ) {
+ this.propagationService.setProperties(this.getRegistrationProperties());
+ }
+ }
+ }
+
+ private boolean match(final String topic, final TopicMatcher[] matchers) {
+ for(final TopicMatcher m : matchers) {
+ if ( m.match(topic) != null ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void calculateTopics() {
+ if ( this.propagationService != null ) {
+ // create a sorted list - this ensures that the property value
+ // is always the same for the same topics.
+ final List<String> topicList = new ArrayList<String>();
+ for(final String topic : this.topicToConsumerMap.keySet() ) {
+ // check whitelist
+ if ( this.match(topic, this.whitelistMatchers) ) {
+ // and blacklist
+ if ( this.blacklistMatchers == null || !this.match(topic, this.blacklistMatchers) ) {
+ topicList.add(topic);
+ }
+ }
+ }
+ Collections.sort(topicList);
+
+ final StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for(final String topic : topicList ) {
+ if ( first ) {
+ first = false;
+ } else {
+ sb.append(',');
+ }
+ sb.append(topic);
+ }
+ this.topics = sb.toString();
+ } else {
+ this.topics = null;
+ }
+ }
+
+ /**
+ * Internal class caching some consumer infos like service id and ranking.
+ */
+ private final static class ConsumerInfo implements Comparable<ConsumerInfo> {
+
+ public final JobConsumer consumer;
+ public final int ranking;
+ public final long serviceId;
+
+ public ConsumerInfo(final JobConsumer consumer, final Map<String, Object> properties) {
+ this.consumer = consumer;
+ final Object sr = properties.get(Constants.SERVICE_RANKING);
+ if ( sr == null || !(sr instanceof Integer)) {
+ this.ranking = 0;
+ } else {
+ this.ranking = (Integer)sr;
+ }
+ this.serviceId = (Long)properties.get(Constants.SERVICE_ID);
+ }
+
+ @Override
+ public int compareTo(final ConsumerInfo o) {
+ if ( this.ranking < o.ranking ) {
+ return 1;
+ } else if (this.ranking > o.ranking ) {
+ return -1;
+ }
+ // If ranks are equal, then sort by service id in descending order.
+ return (this.serviceId < o.serviceId) ? -1 : 1;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if ( obj instanceof ConsumerInfo ) {
+ return ((ConsumerInfo)obj).serviceId == this.serviceId;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return consumer.hashCode();
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+
+/**
+ * This object adds actions to a {@link JobImpl}.
+ */
+public class JobHandler {
+
+ private final JobImpl job;
+
+ public long queued = -1;
+ public long started = -1;
+
+ private final JobManagerImpl jobManager;
+
+ public JobHandler(final JobImpl job, final JobManagerImpl jobManager) {
+ this.job = job;
+ this.jobManager = jobManager;
+ }
+
+ public JobImpl getJob() {
+ return this.job;
+ }
+
+ public boolean start() {
+ return this.jobManager.start(this);
+ }
+
+ public void finished() {
+ this.jobManager.finished(this);
+ }
+
+ public boolean reschedule() {
+ return this.jobManager.reschedule(this);
+ }
+
+ public boolean remove() {
+ return this.jobManager.remove(this);
+ }
+
+ public void reassign() {
+ this.jobManager.reassign(this);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.job.getId().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if ( ! (obj instanceof JobHandler) ) {
+ return false;
+ }
+ return this.job.getId().equals(((JobHandler)obj).job.getId());
+ }
+
+ @Override
+ public String toString() {
+ return "JobEvent(" + this.job.getId() + ")";
+ }
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Calendar;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.api.wrappers.ValueMapDecorator;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobUtil.JobPriority;
+import org.apache.sling.event.jobs.Queue;
+
+/**
+ * This object encapsulates all information about a job.
+ */
+public class JobImpl implements Job {
+
+ /** Internal job property containing the resource path. */
+ public static final String PROPERTY_RESOURCE_PATH = "slingevent:path";
+
+ /** Internal job property if this is an bridged event (event admin). */
+ public static final String PROPERTY_BRIDGED_EVENT = "slingevent:eventadmin";
+
+ private final ValueMap properties;
+
+ private final String topic;
+
+ private final String path;
+
+ private final String name;
+
+ private final String jobId;
+
+ private final boolean isBridgedEvent;
+
+ private final boolean hasReadError;
+
+ /**
+ * Create a new job instance
+ *
+ * @param topic The job topic
+ * @param name The unique job name (optional)
+ * @param jobId The unique (internal) job id
+ * @param properties Non-null map of properties, at least containing {@link #PROPERTY_RESOURCE_PATH}
+ */
+ public JobImpl(final String topic,
+ final String name,
+ final String jobId,
+ final Map<String, Object> properties) {
+ this.topic = topic;
+ this.name = name;
+ this.jobId = jobId;
+ this.path = (String)properties.remove(PROPERTY_RESOURCE_PATH);
+ this.isBridgedEvent = properties.remove(PROPERTY_BRIDGED_EVENT) != null;
+ this.hasReadError = properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR) != null;
+
+ this.properties = new ValueMapDecorator(properties);
+ }
+
+ /**
+ * Get the full resource path.
+ */
+ public String getResourcePath() {
+ return this.path;
+ }
+
+ /**
+ * Is this a bridged event?
+ */
+ public boolean isBridgedEvent() {
+ return this.isBridgedEvent;
+ }
+
+ /**
+ * Did we have read errors?
+ */
+ public boolean hasReadErrors() {
+ return this.hasReadError;
+ }
+
+ /**
+ * Get all properties
+ */
+ public Map<String, Object> getProperties() {
+ return this.properties;
+ }
+
+ /**
+ * Update the information for a retry
+ */
+ public void retry() {
+ final int retries = this.getProperty(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class);
+ this.properties.put(Job.PROPERTY_JOB_RETRY_COUNT, retries + 1);
+ this.properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Job#getTopic()
+ */
+ @Override
+ public String getTopic() {
+ return this.topic;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Job#getName()
+ */
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Job#getId()
+ */
+ @Override
+ public String getId() {
+ return this.jobId;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Job#getProperty(java.lang.String)
+ */
+ @Override
+ public Object getProperty(final String name) {
+ return this.properties.get(name);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Job#getProperty(java.lang.String, java.lang.Class)
+ */
+ @Override
+ public <T> T getProperty(final String name, final Class<T> type) {
+ return this.properties.get(name, type);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Job#getProperty(java.lang.String, java.lang.Object)
+ */
+ @Override
+ public <T> T getProperty(final String name, final T defaultValue) {
+ return this.properties.get(name, defaultValue);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Job#getPropertyNames()
+ */
+ @Override
+ public Set<String> getPropertyNames() {
+ return this.properties.keySet();
+ }
+
+ @Override
+ public JobPriority getJobPriority() {
+ return (JobPriority)this.getProperty(Job.PROPERTY_JOB_PRIORITY);
+ }
+
+ @Override
+ public int getRetryCount() {
+ return (Integer)this.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
+ }
+
+ @Override
+ public int getNumberOfRetries() {
+ return (Integer)this.getProperty(Job.PROPERTY_JOB_RETRIES);
+ }
+
+ @Override
+ public String getQueueName() {
+ return (String)this.getProperty(Job.PROPERTY_JOB_QUEUE_NAME);
+ }
+
+ @Override
+ public String getTargetInstance() {
+ return (String)this.getProperty(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ }
+
+ @Override
+ public Calendar getProcessingStarted() {
+ return (Calendar)this.getProperty(Job.PROPERTY_JOB_STARTED_TIME);
+ }
+
+ @Override
+ public Calendar getCreated() {
+ return (Calendar)this.getProperty(Job.PROPERTY_JOB_CREATED);
+ }
+
+ @Override
+ public String getCreatedInstance() {
+ return (String)this.getProperty(Job.PROPERTY_JOB_CREATED_INSTANCE);
+ }
+
+ public void updateQueue(final Queue queue) {
+ this.properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queue.getName());
+ this.properties.put(Job.PROPERTY_JOB_RETRIES, queue.getConfiguration().getMaxRetries());
+ this.properties.put(Job.PROPERTY_JOB_PRIORITY, queue.getConfiguration().getPriority());
+ }
+
+ @Override
+ public String toString() {
+ return "JobImpl [properties=" + properties + ", topic=" + topic
+ + ", path=" + path + ", name=" + name + ", jobId=" + jobId
+ + ", isBridgedEvent=" + isBridgedEvent + "]";
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Calendar;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.event.impl.support.Environment;
+
+/**
+ * Configuration of the job handling
+ *
+ */
+public class JobManagerConfiguration {
+
+ /** Default repository path. */
+ public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
+
+ /** The path where all jobs are stored. */
+ public static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+ /** Default background load delay. */
+ public static final long DEFAULT_BACKGROUND_LOAD_DELAY = 30;
+
+ /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
+ public static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
+
+ /** The jobs base path with a slash. */
+ private String jobsBasePathWithSlash;
+
+ /** The base path for assigned jobs. */
+ private String assignedJobsPath;
+
+ /** The base path for unassigned jobs. */
+ private String unassignedJobsPath;
+
+ /** The base path for assigned jobs to the current instance. */
+ private String localJobsPath;
+
+ /** The base path for assigned jobs to the current instance - ending with a slash. */
+ private String localJobsPathWithSlash;
+
+ /** The base path for locks. */
+ private String locksPath;
+
+ private String previousVersionAnonPath;
+
+ private String previousVersionIdentifiedPath;
+
+ /** The base path for locks - ending with a slash. */
+ private String locksPathWithSlash;
+
+ private long backgroundLoadDelay;
+
+ public JobManagerConfiguration(final Map<String, Object> props) {
+ this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH),
+ DEFAULT_REPOSITORY_PATH) + '/';
+
+ // create initial resources
+ this.assignedJobsPath = this.jobsBasePathWithSlash + "assigned";
+ this.unassignedJobsPath = this.jobsBasePathWithSlash + "unassigned";
+ this.locksPath = this.jobsBasePathWithSlash + "locks";
+ this.locksPathWithSlash = this.locksPath.concat("/");
+
+ this.localJobsPath = this.assignedJobsPath.concat("/").concat(Environment.APPLICATION_ID);
+ this.localJobsPathWithSlash = this.localJobsPath.concat("/");
+
+ this.previousVersionAnonPath = this.jobsBasePathWithSlash + "anon";
+ this.previousVersionIdentifiedPath = this.jobsBasePathWithSlash + "identified";
+
+ this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+ }
+
+ /**
+ * Get the resource path for all assigned jobs.
+ * @return The path - does not end with a slash.
+ */
+ public String getAssginedJobsPath() {
+ return this.assignedJobsPath;
+ }
+
+ /**
+ * Get the resource path for all unassigned jobs.
+ * @return The path - does not end with a slash.
+ */
+ public String getUnassignedJobsPath() {
+ return this.unassignedJobsPath;
+ }
+
+ /**
+ * Get the resource path for all jobs assigned to the current instance
+ * @return The path - does not end with a slash
+ */
+ public String getLocalJobsPath() {
+ return this.localJobsPath;
+ }
+
+ /**
+ * Get the resource path for all locks
+ * @return The path - does not end with a slash
+ */
+ public String getLocksPath() {
+ return this.locksPath;
+ }
+
+ public long getBackgroundLoadDelay() {
+ return backgroundLoadDelay;
+ }
+
+ /** Counter for jobs without an id. */
+ private final AtomicLong jobCounter = new AtomicLong(0);
+
+ /**
+ * Create a unique job path (folder and name) for the job.
+ */
+ public String getUniquePath(final String targetId,
+ final String topic,
+ final String jobId,
+ final Map<String, Object> jobProperties) {
+ final boolean isBridged = (jobProperties != null ? jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) : false);
+ final String topicName = (isBridged ? JobImpl.PROPERTY_BRIDGED_EVENT : topic.replace('/', '.'));
+ final StringBuilder sb = new StringBuilder();
+ if ( targetId != null ) {
+ sb.append(this.assignedJobsPath);
+ sb.append('/');
+ sb.append(targetId);
+ } else {
+ sb.append(this.unassignedJobsPath);
+ }
+ sb.append('/');
+ sb.append(topicName);
+ sb.append('/');
+ sb.append(jobId);
+
+ return sb.toString();
+ }
+
+ /**
+ * Get the unique job id
+ */
+ public String getUniqueId(final String jobTopic) {
+ final String convTopic = jobTopic.replace('/', '.');
+
+ final Calendar now = Calendar.getInstance();
+ final StringBuilder sb = new StringBuilder();
+ sb.append(now.get(Calendar.YEAR));
+ sb.append('/');
+ sb.append(now.get(Calendar.MONTH) + 1);
+ sb.append('/');
+ sb.append(now.get(Calendar.DAY_OF_MONTH));
+ sb.append('/');
+ sb.append(now.get(Calendar.HOUR_OF_DAY));
+ sb.append('/');
+ sb.append(now.get(Calendar.MINUTE));
+ sb.append('/');
+ sb.append(convTopic);
+ sb.append('_');
+ sb.append(Environment.APPLICATION_ID);
+ sb.append('_');
+ sb.append(jobCounter.getAndIncrement());
+
+ return sb.toString();
+ }
+
+ public boolean isLocalJob(final String jobPath) {
+ return jobPath.startsWith(this.localJobsPathWithSlash);
+ }
+
+ public boolean isJob(final String jobPath) {
+ return jobPath.startsWith(this.jobsBasePathWithSlash);
+ }
+
+ public boolean isLock(final String lockPath) {
+ return lockPath.startsWith(this.locksPathWithSlash);
+ }
+
+ public String getPreviousVersionAnonPath() {
+ return this.previousVersionAnonPath;
+ }
+
+ public String getPreviousVersionIdentifiedPath() {
+ return this.previousVersionIdentifiedPath;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,1292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.QuerySyntaxException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
+import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
+import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue;
+import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue;
+import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue;
+import org.apache.sling.event.impl.jobs.stats.StatisticsImpl;
+import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.JobUtil.JobPriority;
+import org.apache.sling.event.jobs.JobsIterator;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.Statistics;
+import org.apache.sling.event.jobs.TopicStatistics;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of the job manager.
+ */
+@Component(immediate=true,
+ name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
+@Service(value={JobManager.class, EventHandler.class, TopologyEventListener.class, Runnable.class})
+@Properties({
+ @Property(name=JobManagerConfiguration.CONFIG_PROPERTY_REPOSITORY_PATH,
+ value=JobManagerConfiguration.DEFAULT_REPOSITORY_PATH,
+ propertyPrivate=true),
+ @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
+ @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
+ @Property(name=EventConstants.EVENT_TOPIC, propertyPrivate=true,
+ value={SlingConstants.TOPIC_RESOURCE_ADDED,
+ "org/apache/sling/event/notification/job/*",
+ ResourceHelper.BUNDLE_EVENT_STARTED,
+ ResourceHelper.BUNDLE_EVENT_UPDATED})
+})
+public class JobManagerImpl
+ extends StatisticsImpl
+ implements JobManager, EventHandler, TopologyEventListener, Runnable {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** The environment component. */
+ @Reference
+ private EnvironmentComponent environment;
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ /** The configuration manager. */
+ @Reference
+ private QueueConfigurationManager queueConfigManager;
+
+ @Reference
+ private ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ private Scheduler scheduler;
+
+ @Reference
+ private JobConsumerManager jobConsumerManager;
+
+ /** The job manager configuration. */
+ private JobManagerConfiguration configuration;
+
+ private volatile TopologyCapabilities topologyCapabilities;
+
+ private MaintenanceTask maintenanceTask;
+
+ private BackgroundLoader backgroundLoader;
+
+ /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
+ private final Object queuesLock = new Object();
+
+ /** All active queues. */
+ private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+
+ /** We count the scheduler runs. */
+ private volatile long schedulerRuns;
+
+ /** Current statistics. */
+ private final StatisticsImpl baseStatistics = new StatisticsImpl();
+
+ /** Last update for current statistics. */
+ private long lastUpdatedStatistics;
+
+ /** Statistics per topic. */
+ private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<String, TopicStatistics>();
+
+ /**
+ * Activate this component.
+ * @param props Configuration properties
+ */
+ @Activate
+ protected void activate(final Map<String, Object> props) throws LoginException {
+ this.configuration = new JobManagerConfiguration(props);
+ this.maintenanceTask = new MaintenanceTask(this.configuration, this.resourceResolverFactory);
+ this.backgroundLoader = new BackgroundLoader(this, this.configuration, this.resourceResolverFactory);
+
+ // create initial resources
+ final ResourceResolver resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ try {
+ ResourceHelper.getOrCreateBasePath(resolver, this.configuration.getLocalJobsPath());
+ ResourceHelper.getOrCreateBasePath(resolver, this.configuration.getUnassignedJobsPath());
+ ResourceHelper.getOrCreateBasePath(resolver, this.configuration.getLocksPath());
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ } finally {
+ resolver.close();
+ }
+
+ logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID);
+ }
+
+ /**
+ * Configure this component.
+ * @param props Configuration properties
+ */
+ @Modified
+ protected void update(final Map<String, Object> props) {
+ // nothing to do
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ @Deactivate
+ protected void deactivate() {
+ logger.info("Apache Sling Job Manager stopping on instance {}", Environment.APPLICATION_ID);
+ this.backgroundLoader.deactivate();
+ this.backgroundLoader = null;
+
+ this.maintenanceTask = null;
+ this.configuration = null;
+ final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
+ while ( i.hasNext() ) {
+ final AbstractJobQueue jbq = i.next();
+ // update mbeans
+ eventAdmin.sendEvent(new QueueStatusEvent(null, jbq));
+ jbq.close();
+ // update mbeans
+ eventAdmin.sendEvent(new QueueStatusEvent(null, jbq));
+ }
+ this.queues.clear();
+ logger.info("Apache Sling Job Manager stopped on instance {}", Environment.APPLICATION_ID);
+ }
+
+ /**
+ * This method is invoked periodically by the scheduler.
+ * It searches for idle queues and stops them after a timeout. If a queue
+ * is idle for two consecutive clean up calls, it is removed.
+ * @see java.lang.Runnable#run()
+ */
+ private void maintain() {
+ this.schedulerRuns++;
+ logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns);
+
+ // check for unprocessed jobs first
+ logger.debug("Checking for unprocessed jobs...");
+ for(final AbstractJobQueue jbq : this.queues.values() ) {
+ jbq.checkForUnprocessedJobs();
+ }
+
+ // we only do a full clean up on every fifth run
+ final boolean doFullCleanUp = (schedulerRuns % 5 == 0);
+
+ if ( doFullCleanUp ) {
+ // check for idle queue
+ logger.debug("Checking for idle queues...");
+
+ // we synchronize to avoid creating a queue which is about to be removed during cleanup
+ synchronized ( queuesLock ) {
+ final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, AbstractJobQueue> current = i.next();
+ final AbstractJobQueue jbq = current.getValue();
+ if ( jbq.isMarkedForRemoval() ) {
+ logger.debug("Removing idle Job Queue {}", jbq);
+ // close
+ jbq.close();
+ // copy statistics
+ this.baseStatistics.add(jbq);
+ // remove
+ i.remove();
+ // update mbeans
+ eventAdmin.sendEvent(new QueueStatusEvent(null, jbq));
+ } else {
+ // mark to be removed during next cycle
+ jbq.markForRemoval();
+ }
+ }
+ }
+ }
+ // invoke maintenance task
+ final MaintenanceTask task = this.maintenanceTask;
+ if ( task != null ) {
+ task.run(this.topologyCapabilities, this.queueConfigManager, this.schedulerRuns);
+ }
+ logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
+ }
+
+ /**
+ * Process a new job
+ * This method first searches the corresponding queue - if such a queue
+ * does not exist yet, it is created and started.
+ * @param handler The job handler
+ */
+ void process(final JobImpl job) {
+ final JobHandler handler = new JobHandler(job, this);
+
+ // check if we still are able to process this job
+ final JobConsumer consumer = this.jobConsumerManager.getConsumer(job.getTopic());
+ boolean reassign = false;
+ String reassignTargetId = null;
+ if ( consumer == null && (!job.isBridgedEvent() || !this.jobConsumerManager.supportsBridgedEvents())) {
+ reassign = true;
+ }
+
+ // get the queue configuration
+ final QueueInfo queueInfo = queueConfigManager.getQueueInfo(handler.getJob().getTopic());
+ final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+
+ // Sanity check if queue configuration has changed
+ if ( config.getType() == QueueConfiguration.Type.DROP ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+ }
+ handler.remove();
+ } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
+ if ( !reassign ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Ignoring job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+ }
+ }
+ } else {
+
+ if ( reassign ) {
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ reassignTargetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+
+ } else {
+ // get or create queue
+ AbstractJobQueue queue = null;
+ // we synchronize to avoid creating a queue which is about to be removed during cleanup
+ synchronized ( queuesLock ) {
+ queue = this.queues.get(queueInfo.queueName);
+ // check for reconfiguration, we really do an identity check here(!)
+ if ( queue != null && queue.getConfiguration() != config ) {
+ this.outdateQueue(queue);
+ // we use a new queue with the configuration
+ queue = null;
+ }
+ if ( queue == null ) {
+ if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
+ queue = new OrderedJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin);
+ } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
+ queue = new ParallelJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin, this.scheduler);
+ } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
+ queue = new TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.eventAdmin, this.scheduler);
+ }
+ if ( queue == null ) {
+ // this is just a sanity check, actually we can never get here
+ logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+ handler.remove();
+ } else {
+ queues.put(queueInfo.queueName, queue);
+ eventAdmin.sendEvent(new QueueStatusEvent(queue, null));
+ queue.start();
+ }
+ }
+ }
+
+ // and put job
+ if ( queue != null ) {
+ handler.getJob().updateQueue(queue);
+ queue.process(handler);
+ }
+ }
+ }
+ if ( reassign ) {
+ this.maintenanceTask.reassignJob(job, reassignTargetId);
+ }
+ }
+
+ /**
+ * This method is invoked periodically by the scheduler.
+ * In the default configuration every minute
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ this.maintain();
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ private void outdateQueue(final AbstractJobQueue queue) {
+ // remove the queue with the old name
+ this.queues.remove(queue.getName());
+ // check if we can close or have to rename
+ queue.markForRemoval();
+ if ( queue.isMarkedForRemoval() ) {
+ // close
+ queue.close();
+ // copy statistics
+ this.baseStatistics.add(queue);
+ // update mbeans
+ eventAdmin.sendEvent(new QueueStatusEvent(null, queue));
+ } else {
+ // notify queue
+ queue.rename(queue.getName() + "<outdated>(" + queue.hashCode() + ")");
+ // readd with new name
+ this.queues.put(queue.getName(), queue);
+ // update mbeans
+ eventAdmin.sendEvent(new QueueStatusEvent(queue, queue));
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.stats.StatisticsImpl#reset()
+ * Reset this statistics and all queues.
+ */
+ @Override
+ public synchronized void reset() {
+ this.baseStatistics.reset();
+ for(final AbstractJobQueue jq : this.queues.values() ) {
+ jq.reset();
+ }
+ this.topicStatistics.clear();
+ this.lastUpdatedStatistics = 0;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#restart()
+ */
+ @Override
+ public void restart() {
+ // let's rename/close all queues and clear them
+ synchronized ( queuesLock ) {
+ final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
+ for(final AbstractJobQueue queue : queues ) {
+ queue.clear();
+ this.outdateQueue(queue);
+ }
+ }
+ // reset statistics
+ this.reset();
+ // and now load again
+ this.backgroundLoader.restart();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#isJobProcessingEnabled()
+ */
+ @Override
+ public boolean isJobProcessingEnabled() {
+ return true;
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ @Override
+ public void handleEvent(final Event event) {
+ if ( SlingConstants.TOPIC_RESOURCE_ADDED.equals(event.getTopic()) ) {
+ final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
+ final String rt = (String) event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
+ if ( (rt == null || ResourceHelper.RESOURCE_TYPE_JOB.equals(rt)) &&
+ this.configuration.isLocalJob(path) ) {
+ this.backgroundLoader.loadJob(path);
+ }
+ } else if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
+ || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
+ this.backgroundLoader.tryToReloadUnloadedJobs();
+ } else {
+ if ( EventUtil.isLocal(event) ) {
+ // job notifications
+ final String topic = (String)event.getProperty(JobUtil.NOTIFICATION_PROPERTY_JOB_TOPIC);
+ if ( topic != null ) { // this is just a sanity check
+ TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+ if ( ts == null ) {
+ this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
+ ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
+ }
+ if ( event.getTopic().equals(JobUtil.TOPIC_JOB_CANCELLED) ) {
+ ts.addCancelled();
+ } else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_FAILED) ) {
+ ts.addFailed();
+ } else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_FINISHED) ) {
+ final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
+ ts.addFinished(time == null ? -1 : time);
+ } else if ( event.getTopic().equals(JobUtil.TOPIC_JOB_STARTED) ) {
+ final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
+ ts.addActivated(time == null ? -1 : time);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Read a job
+ */
+ JobImpl readJob(final Resource resource) {
+ JobImpl job = null;
+ if ( resource != null ) {
+ final ValueMap vm = ResourceUtil.getValueMap(resource);
+
+ final String errorMessage = Utility.checkJobTopic(vm.get(JobUtil.PROPERTY_JOB_TOPIC));
+ if ( errorMessage == null ) {
+ final String topic = vm.get(JobUtil.PROPERTY_JOB_TOPIC, String.class);
+ final Map<String, Object> jobProperties = ResourceHelper.cloneValueMap(vm);
+
+ jobProperties.put(JobImpl.PROPERTY_RESOURCE_PATH, resource.getPath());
+ // convert to integers (JCR supports only long...)
+ jobProperties.put(Job.PROPERTY_JOB_RETRIES, vm.get(Job.PROPERTY_JOB_RETRIES, Integer.class));
+ jobProperties.put(Job.PROPERTY_JOB_RETRY_COUNT, vm.get(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class));
+ jobProperties.put(Job.PROPERTY_JOB_PRIORITY, JobPriority.valueOf(vm.get(Job.PROPERTY_JOB_PRIORITY, String.class)));
+
+ job = new JobImpl(topic,
+ (String)jobProperties.get(JobUtil.PROPERTY_JOB_NAME),
+ (String)jobProperties.get(JobUtil.JOB_ID),
+ jobProperties);
+ } else {
+ logger.warn(errorMessage + " : {}", vm);
+ // remove the job as the topic is invalid anyway
+ try {
+ resource.getResourceResolver().delete(resource);
+ resource.getResourceResolver().commit();
+ } catch ( final PersistenceException ignore) {
+ this.ignoreException(ignore);
+ }
+ }
+
+ }
+ return job;
+ }
+
+ private long stopProcessing() {
+ long changeCount = 0;
+ this.backgroundLoader.stop();
+
+ // let's rename/close all queues and clear them
+ synchronized ( queuesLock ) {
+ final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
+ for(final AbstractJobQueue queue : queues ) {
+ queue.clear();
+ this.outdateQueue(queue);
+ }
+ }
+
+ // deactivate old capabilities - this stops all background processes
+ if ( this.topologyCapabilities != null ) {
+ changeCount = this.topologyCapabilities.getChangeCount() + 1;
+ this.topologyCapabilities.deactivate();
+ }
+ this.topologyCapabilities = null;
+ return changeCount;
+ }
+
+ private void startProcessing(final long changeCount, final TopologyView view) {
+ // create new capabilities and update view
+ this.topologyCapabilities = new TopologyCapabilities(view, changeCount);
+
+ this.backgroundLoader.start();
+ }
+
+ /**
+ * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+ */
+ @Override
+ public void handleTopologyEvent(final TopologyEvent event) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Received topology event {}", event);
+ }
+
+ // check if there is a change of properties which doesn't affect us
+ if ( event.getType() == Type.PROPERTIES_CHANGED ) {
+ final Map<String, String> newAllInstances = TopologyCapabilities.getAllInstancesMap(event.getNewView());
+ if ( this.topologyCapabilities != null && this.topologyCapabilities.isSame(newAllInstances) ) {
+ logger.debug("No changes in capabilities.");
+ return;
+ }
+ }
+
+ if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+ this.stopProcessing();
+
+ } else if ( event.getType() == Type.TOPOLOGY_INIT
+ || event.getType() == Type.TOPOLOGY_CHANGED
+ || event.getType() == Type.PROPERTIES_CHANGED ) {
+
+ // change count for new capabilities
+ final long changeCount = this.stopProcessing();
+
+ this.startProcessing(changeCount, event.getNewView());
+ }
+ }
+
+ /**
+ * Return our internal statistics object.
+ * We recalculate this every 5sec (if requested)
+ *
+ * @see org.apache.sling.event.jobs.JobManager#getStatistics()
+ */
+ @Override
+ public synchronized Statistics getStatistics() {
+ final long now = System.currentTimeMillis();
+ if ( this.lastUpdatedStatistics + 5000 < now ) {
+ this.copyFrom(this.baseStatistics);
+ for(final AbstractJobQueue jq : this.queues.values() ) {
+ this.add(jq);
+ }
+ }
+ return this;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getTopicStatistics()
+ */
+ @Override
+ public Iterable<TopicStatistics> getTopicStatistics() {
+ return topicStatistics.values();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String)
+ */
+ @Override
+ public Queue getQueue(final String name) {
+ return this.queues.get(name);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getQueues()
+ */
+ @Override
+ public Iterable<Queue> getQueues() {
+ final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
+ return new Iterable<Queue>() {
+
+ @Override
+ public Iterator<Queue> iterator() {
+ return new Iterator<Queue>() {
+
+ @Override
+ public boolean hasNext() {
+ return jqI.hasNext();
+ }
+
+ @Override
+ public Queue next() {
+ return jqI.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public JobsIterator queryJobs(final QueryType type, final String topic, final Map<String, Object>... templates) {
+ return this.queryJobs(type, topic, -1, templates);
+ }
+
+ @Override
+ public JobsIterator queryJobs(final QueryType type, final String topic,
+ final long limit,
+ final Map<String, Object>... templates) {
+ final Collection<Job> list = this.findJobs(type, topic, limit, templates);
+ final Iterator<Job> iter = list.iterator();
+ return new JobsIterator() {
+
+ private int index;
+
+ @Override
+ public Iterator<Event> iterator() {
+ return this;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Event next() {
+ index++;
+ final Job job = iter.next();
+ return Utility.toEvent(job);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public void skip(final long skipNum) {
+ long m = skipNum;
+ while ( m > 0 && this.hasNext() ) {
+ this.next();
+ m--;
+ }
+ }
+
+ @Override
+ public long getSize() {
+ return list.size();
+ }
+
+ @Override
+ public long getPosition() {
+ return index;
+ }
+ };
+ }
+
+ @Override
+ public Event findJob(final String topic, final Map<String, Object> template) {
+ final Job job = this.getJob(topic, template);
+ if ( job != null ) {
+ return Utility.toEvent(job);
+ }
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#removeJob(java.lang.String)
+ */
+ @Override
+ public boolean removeJob(final String jobId) {
+ logger.debug("Trying to remove job {}", jobId);
+ boolean result = true;
+ final Job job = this.getJobById(jobId);
+ logger.debug("Found removal job: {}", job);
+ if ( job != null ) {
+ // currently running?
+ if ( job.getProcessingStarted() != null ) {
+ logger.debug("Unable to remove job - job is started: {}", job);
+ result = false;
+ } else {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(((JobImpl)job).getResourcePath());
+ if ( jobResource != null ) {
+ resolver.delete(jobResource);
+ resolver.commit();
+ logger.debug("Removed job with id: {}", jobId);
+ } else {
+ logger.debug("Unable to remove job with id - resource already removed: {}", jobId);
+ }
+ } catch ( final LoginException le ) {
+ this.ignoreException(le);
+ result = false;
+ } catch ( final PersistenceException pe) {
+ this.ignoreException(pe);
+ result = false;
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#forceRemoveJob(java.lang.String)
+ */
+ @Override
+ public void forceRemoveJob(final String jobId) {
+ this.removeJobById(jobId);
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#addJob(java.lang.String, java.lang.String, java.util.Map)
+ */
+ @Override
+ public Job addJob(final String topic, final String name, final Map<String, Object> properties) {
+ Job result = this.addJobInteral(topic, name, properties);
+ if ( result == null && name != null ) {
+ result = this.getJobByName(name);
+ }
+ return result;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getJobByName(java.lang.String)
+ */
+ @Override
+ public Job getJobByName(final String name) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ final StringBuilder buf = new StringBuilder(64);
+
+ buf.append("//element(*,");
+ buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
+ buf.append(")[@");
+ buf.append(ISO9075.encode(JobUtil.PROPERTY_JOB_NAME));
+ buf.append(" = '");
+ buf.append(name);
+ buf.append("']");
+ final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+ while ( result.hasNext() ) {
+ final Resource jobResource = result.next();
+ // sanity check for the path
+ if ( this.configuration.isJob(jobResource.getPath()) ) {
+ final JobImpl job = this.readJob(jobResource);
+ if ( job != null ) {
+ return job;
+ }
+ }
+ }
+ } catch (final QuerySyntaxException qse) {
+ this.ignoreException(qse);
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getJobById(java.lang.String)
+ */
+ @Override
+ public Job getJobById(final String id) {
+ logger.debug("Getting job by id: {}", id);
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ final StringBuilder buf = new StringBuilder(64);
+
+ buf.append("//element(*,");
+ buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
+ buf.append(")[@");
+ buf.append(JobUtil.JOB_ID);
+ buf.append(" = '");
+ buf.append(id);
+ buf.append("']");
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Exceuting query: {}", buf.toString());
+ }
+ final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+ while ( result.hasNext() ) {
+ final Resource jobResource = result.next();
+ // sanity check for the path
+ if ( this.configuration.isJob(jobResource.getPath()) ) {
+ final JobImpl job = this.readJob(jobResource);
+ if ( job != null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Found job with id {} = {}", id, job);
+ }
+ return job;
+ }
+ }
+ }
+ } catch (final QuerySyntaxException qse) {
+ this.ignoreException(qse);
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ logger.debug("Job not found with id: {}", id);
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#getJob(java.lang.String, java.util.Map)
+ */
+ @Override
+ public Job getJob(final String topic, final Map<String, Object> template) {
+ final Iterable<Job> iter = this.findJobs(QueryType.ALL, topic, 1, template);
+ final Iterator<Job> i = iter.iterator();
+ if ( i.hasNext() ) {
+ return i.next();
+ }
+ return null;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#removeJobById(java.lang.String)
+ */
+ @Override
+ public boolean removeJobById(String jobId) {
+ boolean result = true;
+ final Job job = this.getJobById(jobId);
+ if ( job != null ) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(((JobImpl)job).getResourcePath());
+ if ( jobResource != null ) {
+ resolver.delete(jobResource);
+ resolver.commit();
+ }
+ } catch ( final LoginException le ) {
+ this.ignoreException(le);
+ result = false;
+ } catch ( final PersistenceException pe) {
+ this.ignoreException(pe);
+ result = false;
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobManager#findJobs(org.apache.sling.event.jobs.JobManager.QueryType, java.lang.String, long, java.util.Map<java.lang.String,java.lang.Object>[])
+ */
+ @Override
+ public Collection<Job> findJobs(final QueryType type,
+ final String topic,
+ final long limit,
+ final Map<String, Object>... templates) {
+ final List<Job> result = new ArrayList<Job>();
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ final StringBuilder buf = new StringBuilder(64);
+
+ buf.append("//element(*,");
+ buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
+ buf.append(")[@");
+ buf.append(ISO9075.encode(JobUtil.PROPERTY_JOB_TOPIC));
+ buf.append(" = '");
+ buf.append(topic);
+ buf.append("'");
+ if ( type == QueryType.ACTIVE ) {
+ buf.append(" and @");
+ buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+ } else if ( type == QueryType.QUEUED ) {
+ buf.append(" and not(@");
+ buf.append(ISO9075.encode(Job.PROPERTY_JOB_STARTED_TIME));
+ buf.append(")");
+ }
+ if ( templates != null && templates.length > 0 ) {
+ buf.append(" and (");
+ int index = 0;
+ for (final Map<String,Object> template : templates) {
+ if ( index > 0 ) {
+ buf.append(" or ");
+ }
+ buf.append('(');
+ final Iterator<Map.Entry<String, Object>> i = template.entrySet().iterator();
+ boolean first = true;
+ while ( i.hasNext() ) {
+ final Map.Entry<String, Object> current = i.next();
+ final String propName = ISO9075.encode(current.getKey());
+ if ( first ) {
+ first = false;
+ buf.append('@');
+ } else {
+ buf.append(" and @");
+ }
+ buf.append(propName);
+ buf.append(" = '");
+ buf.append(current.getValue());
+ buf.append("'");
+ }
+ buf.append(')');
+ index++;
+ }
+ buf.append(')');
+ }
+ buf.append("] order by @");
+ buf.append(Job.PROPERTY_JOB_CREATED);
+ buf.append(" ascending");
+ final Iterator<Resource> iter = resolver.findResources(buf.toString(), "xpath");
+ long count = 0;
+
+ while ( iter.hasNext() && (limit < 1 || count < limit) ) {
+ final Resource jobResource = iter.next();
+ // sanity check for the path
+ if ( this.configuration.isJob(jobResource.getPath()) ) {
+ final JobImpl job = readJob(jobResource);
+ if ( job != null ) {
+ count++;
+ result.add(job);
+ }
+ }
+ }
+ } catch (final QuerySyntaxException qse) {
+ this.ignoreException(qse);
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ return result;
+ }
+
+ public void finished(final JobHandler info) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
+ if ( jobResource != null ) {
+ try {
+ resolver.delete(jobResource);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ // ignore
+ }
+ }
+ } catch ( final LoginException ignore ) {
+ // ignore
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * Reschedule a job.
+ *
+ * Update the retry count and remove the started time.
+ * @param info The job info
+ * @return true if the job could be updated.
+ */
+ public boolean reschedule(final JobHandler info) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, info.getJob().getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+ mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ try {
+ resolver.commit();
+ return true;
+ } catch ( final PersistenceException pe ) {
+ // ignore
+ }
+ }
+ } catch ( final LoginException ignore ) {
+ // ignore
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Remove the job.
+ * @param info
+ * @return
+ */
+ public boolean remove(final JobHandler info) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
+ if ( jobResource != null ) {
+ Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, info.getJob(), null);
+ try {
+ resolver.delete(jobResource);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ // ignore
+ }
+ }
+ } catch ( final LoginException ignore ) {
+ // ignore
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Try to start the job
+ */
+ public boolean start(final JobHandler info) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(info.getJob().getResourcePath());
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ mvm.put(Job.PROPERTY_JOB_STARTED_TIME, Calendar.getInstance());
+ mvm.put(Job.PROPERTY_JOB_QUEUE_NAME, info.getJob().getQueueName());
+ mvm.put(Job.PROPERTY_JOB_RETRIES, info.getJob().getNumberOfRetries());
+ mvm.put(Job.PROPERTY_JOB_PRIORITY, info.getJob().getJobPriority().name());
+ resolver.commit();
+
+ return true;
+ }
+ } catch ( final PersistenceException ignore ) {
+ this.ignoreException(ignore);
+ } catch ( final LoginException ignore ) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Try to get a "lock" for a resource
+ */
+ private boolean lock(final String id) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Trying to get lock for {}", id);
+ }
+ boolean hasLock = false;
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final StringBuilder sb = new StringBuilder(this.configuration.getLocksPath());
+ sb.append('/');
+ sb.append(ResourceHelper.filterName(id));
+ final String path = sb.toString();
+
+ Resource lockResource = resolver.getResource(path);
+ if ( lockResource == null ) {
+ resolver.refresh();
+ try {
+ final Map<String, Object> props = new HashMap<String, Object>();
+ props.put(Utility.PROPERTY_LOCK_CREATED, Calendar.getInstance());
+ props.put(Utility.PROPERTY_LOCK_CREATED_APP, Environment.APPLICATION_ID);
+ lockResource = ResourceHelper.getOrCreateResource(resolver,
+ path,
+ props);
+
+ final ValueMap vm = lockResource.adaptTo(ValueMap.class);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Got lock resource on instance {} with {}", Environment.APPLICATION_ID, vm.get(Utility.PROPERTY_LOCK_CREATED_APP));
+ }
+ if ( vm.get(Utility.PROPERTY_LOCK_CREATED_APP).equals(Environment.APPLICATION_ID) ) {
+ hasLock = true;
+ }
+ } catch (final PersistenceException ignore) {
+ // ignore
+ this.ignoreException(ignore);
+ }
+ }
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Lock for {} = {}", id, hasLock);
+ }
+ return hasLock;
+ }
+
+
+
+ private Job addJobInteral(final String jobTopic, final String jobName, final Map<String, Object> jobProperties) {
+ final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic);
+ if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Dropping job due to configuration of queue {} : {}", info.queueName, Utility.toString(jobTopic, jobName, jobProperties));
+ }
+ Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_CANCELLED, jobTopic, jobName, jobProperties, null);
+ } else {
+ // check for unique jobs
+ if ( jobName != null && !this.lock(jobName) ) {
+ logger.debug("Discarding duplicate job {}", Utility.toString(jobTopic, jobName, jobProperties));
+ return null;
+ } else {
+ if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ info.targetId = (caps == null ? null : caps.detectTarget(jobTopic, jobProperties, info));
+ }
+ if ( logger.isDebugEnabled() ) {
+ if ( info.targetId != null ) {
+ logger.debug("Persisting job {} into queue {}, target={}", new Object[] {Utility.toString(jobTopic, jobName, jobProperties), info.queueName, info.targetId});
+ } else {
+ logger.debug("Persisting job {} into queue {}", Utility.toString(jobTopic, jobName, jobProperties), info.queueName);
+ }
+ }
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ return this.writeJob(resolver,
+ jobTopic,
+ jobName,
+ jobProperties,
+ info);
+ } catch (final PersistenceException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobName, jobProperties) + "'", re);
+ } catch (final LoginException le) {
+ // there is nothing we can do except log!
+ this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobName, jobProperties) + "'", le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Write a job to the resource tree.
+ * @param resolver The resolver resolver
+ * @param event The event
+ * @param info The queue information (queue name etc.)
+ * @throws PersistenceException
+ */
+ private Job writeJob(final ResourceResolver resolver,
+ final String jobTopic,
+ final String jobName,
+ final Map<String, Object> jobProperties,
+ final QueueInfo info)
+ throws PersistenceException {
+ final String jobId = this.configuration.getUniqueId(jobTopic);
+ final String path = this.configuration.getUniquePath(info.targetId, jobTopic, jobId, jobProperties);
+
+ // create properties
+ final Map<String, Object> properties = new HashMap<String, Object>();
+
+ if ( jobProperties != null ) {
+ for(final Map.Entry<String, Object> entry : jobProperties.entrySet() ) {
+ final String propName = entry.getKey();
+ if ( !ResourceHelper.ignoreProperty(propName) ) {
+ properties.put(propName, entry.getValue());
+ }
+ }
+ }
+
+ properties.put(JobUtil.JOB_ID, jobId);
+ properties.put(JobUtil.PROPERTY_JOB_TOPIC, jobTopic);
+ if ( jobName != null ) {
+ properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
+ }
+ properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueConfiguration.getName());
+ properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
+ properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
+ properties.put(Job.PROPERTY_JOB_PRIORITY, info.queueConfiguration.getPriority().name());
+
+ properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
+ properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
+ if ( info.targetId != null ) {
+ properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, info.targetId);
+ } else {
+ properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ }
+
+ // create path and resource
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
+ ResourceHelper.getOrCreateResource(resolver,
+ path,
+ properties);
+
+ // update property types - priority and create job
+ properties.put(Job.PROPERTY_JOB_PRIORITY, info.queueConfiguration.getPriority());
+ return new JobImpl(jobTopic, jobName, jobId, properties);
+ }
+
+ public void reassign(final JobHandler handler) {
+ final JobImpl job = handler.getJob();
+ final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+ final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+
+ // Sanity check if queue configuration has changed
+ if ( config.getType() == QueueConfiguration.Type.DROP ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+ }
+ handler.remove();
+ } else {
+ String targetId = null;
+ if ( config.getType() != QueueConfiguration.Type.IGNORE ) {
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+ }
+ this.maintenanceTask.reassignJob(handler.getJob(), targetId);
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain