You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/10/26 11:16:43 UTC
svn commit: r1710549 [1/2] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/event/impl/jobs/
src/main/java/org/apache/sling/event/impl/jobs/config/
src/main/java/org/apache/sling/event/...
Author: cziegeler
Date: Mon Oct 26 10:16:42 2015
New Revision: 1710549
URL: http://svn.apache.org/viewvc?rev=1710549&view=rev
Log:
SLING-5194 : Remove all deprecated features
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobProcessor.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobsIterator.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/package-info.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedJobsTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobsImplTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Mon Oct 26 10:16:42 2015
@@ -29,7 +29,7 @@
<artifactId>org.apache.sling.event</artifactId>
<packaging>bundle</packaging>
- <version>3.7.7-SNAPSHOT</version>
+ <version>4.0.0-SNAPSHOT</version>
<name>Apache Sling Event Support</name>
<description>
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java Mon Oct 26 10:16:42 2015
@@ -56,7 +56,7 @@ public class JobBuilderImpl implements J
@Override
public Job add(final List<String> errors) {
- return this.jobManager.addJob(this.topic, null, this.properties, errors);
+ return this.jobManager.addJob(this.topic, this.properties, errors);
}
@Override
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Mon Oct 26 10:16:42 2015
@@ -100,9 +100,6 @@ public class JobConsumerManager {
/** The map with the consumers, keyed by topic, sorted by service ranking. */
private final Map<String, List<ConsumerInfo>> topicToConsumerMap = new HashMap<String, List<ConsumerInfo>>();
- /** Marker if this instance supports bridged events. */
- private boolean supportsBridgedEvents;
-
/** ServiceRegistration for propagation. */
private ServiceRegistration propagationService;
@@ -240,13 +237,6 @@ public class JobConsumerManager {
}
/**
- * Does this instance supports bridged events?
- */
- public boolean supportsBridgedEvents() {
- return supportsBridgedEvents;
- }
-
- /**
* Bind a new consumer
* @param serviceReference The service reference to the consumer.
*/
@@ -304,7 +294,6 @@ public class JobConsumerManager {
}
}
}
- this.supportsBridgedEvents = this.topicToConsumerMap.containsKey("/");
if ( changed ) {
this.calculateTopics(this.propagationService != null);
}
@@ -354,7 +343,6 @@ public class JobConsumerManager {
}
}
}
- this.supportsBridgedEvents = this.topicToConsumerMap.containsKey("/");
if ( changed ) {
this.calculateTopics(this.propagationService != null);
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Mon Oct 26 10:16:42 2015
@@ -29,7 +29,6 @@ import org.apache.sling.api.resource.Val
import org.apache.sling.api.wrappers.ValueMapDecorator;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil.JobPriority;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
@@ -41,9 +40,6 @@ public class JobImpl implements Job, Com
/** Internal job property containing the resource path. */
public static final String PROPERTY_RESOURCE_PATH = "slingevent:path";
- /** Internal job property if this is an bridged event (event admin). */
- public static final String PROPERTY_BRIDGED_EVENT = "slingevent:eventadmin";
-
/** Internal job property containing optional delay override. */
public static final String PROPERTY_DELAY_OVERRIDE = ":slingevent:delayOverride";
@@ -65,12 +61,8 @@ public class JobImpl implements Job, Com
private final String path;
- private final String name;
-
private final String jobId;
- private final boolean isBridgedEvent;
-
private final List<Exception> readErrorList;
private final long counter;
@@ -85,14 +77,11 @@ public class JobImpl implements Job, Com
*/
@SuppressWarnings("unchecked")
public JobImpl(final String topic,
- final String name,
final String jobId,
final Map<String, Object> properties) {
this.topic = topic;
- this.name = name;
this.jobId = jobId;
this.path = (String)properties.remove(PROPERTY_RESOURCE_PATH);
- this.isBridgedEvent = properties.get(PROPERTY_BRIDGED_EVENT) != null;
this.readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
this.properties = new ValueMapDecorator(properties);
@@ -109,13 +98,6 @@ public class JobImpl implements Job, Com
}
/**
- * Is this a bridged event?
- */
- public boolean isBridgedEvent() {
- return this.isBridgedEvent;
- }
-
- /**
* Did we have read errors?
*/
public boolean hasReadErrors() {
@@ -147,14 +129,6 @@ public class JobImpl implements Job, Com
}
/**
- * @see org.apache.sling.event.jobs.Job#getName()
- */
- @Override
- public String getName() {
- return this.name;
- }
-
- /**
* @see org.apache.sling.event.jobs.Job#getId()
*/
@Override
@@ -195,11 +169,6 @@ public class JobImpl implements Job, Com
}
@Override
- public JobPriority getJobPriority() {
- return JobPriority.NORM;
- }
-
- @Override
public int getRetryCount() {
return this.getProperty(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class);
}
@@ -421,7 +390,6 @@ public class JobImpl implements Job, Com
@Override
public String toString() {
return "JobImpl [properties=" + properties + ", topic=" + topic
- + ", path=" + path + ", name=" + name + ", jobId=" + jobId
- + ", isBridgedEvent=" + isBridgedEvent + "]";
+ + ", path=" + path + ", jobId=" + jobId + "]";
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Oct 26 10:16:42 2015
@@ -40,7 +40,6 @@ import org.apache.sling.api.resource.Per
import org.apache.sling.api.resource.QuerySyntaxException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
@@ -58,8 +57,6 @@ import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Job.JobState;
import org.apache.sling.event.jobs.JobBuilder;
import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.JobUtil;
-import org.apache.sling.event.jobs.JobsIterator;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.ScheduledJobInfo;
@@ -165,24 +162,6 @@ public class JobManagerImpl
}
/**
- * @see org.apache.sling.event.jobs.JobManager#restart()
- */
- @Override
- public void restart() {
- // nothing to do as this is deprecated, let's log a warning
- Utility.logDeprecated(logger, "Deprecated JobManager.restart() is called.");
- }
-
- /**
- * @see org.apache.sling.event.jobs.JobManager#isJobProcessingEnabled()
- */
- @Override
- public boolean isJobProcessingEnabled() {
- Utility.logDeprecated(logger, "Deprecated JobManager.isJobProcessingEnabled() is called.");
- return true;
- }
-
- /**
* @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
@Override
@@ -224,84 +203,6 @@ public class JobManagerImpl
return qManager.getQueues();
}
- @Override
- public JobsIterator queryJobs(final QueryType type, final String topic, final Map<String, Object>... templates) {
- return this.queryJobs(type, topic, -1, templates);
- }
-
- @Override
- public JobsIterator queryJobs(final QueryType type, final String topic,
- final long limit,
- final Map<String, Object>... templates) {
- Utility.logDeprecated(logger, "Deprecated JobManager.queryJobs(...) is called.");
- final Collection<Job> list = this.findJobs(type, topic, limit, templates);
- final Iterator<Job> iter = list.iterator();
- return new JobsIterator() {
-
- private int index;
-
- @Override
- public Iterator<Event> iterator() {
- return this;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Event next() {
- index++;
- final Job job = iter.next();
- return Utility.toEvent(job);
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public void skip(final long skipNum) {
- long m = skipNum;
- while ( m > 0 && this.hasNext() ) {
- this.next();
- m--;
- }
- }
-
- @Override
- public long getSize() {
- return list.size();
- }
-
- @Override
- public long getPosition() {
- return index;
- }
- };
- }
-
- @Override
- public Event findJob(final String topic, final Map<String, Object> template) {
- Utility.logDeprecated(logger, "Deprecated JobManager.findJob(...) is called.");
- final Job job = this.getJob(topic, template);
- if ( job != null ) {
- return Utility.toEvent(job);
- }
- return null;
- }
-
- /**
- * @see org.apache.sling.event.jobs.JobManager#removeJob(java.lang.String)
- */
- @Override
- public boolean removeJob(final String jobId) {
- Utility.logDeprecated(logger, "Deprecated JobManager.removeJob(...) is called.");
- return this.internalRemoveJobById(jobId, false);
- }
-
/**
* Remove a job.
* If the job is already in the storage area, it's removed forever.
@@ -359,67 +260,11 @@ public class JobManagerImpl
}
/**
- * @see org.apache.sling.event.jobs.JobManager#forceRemoveJob(java.lang.String)
- */
- @Override
- public void forceRemoveJob(final String jobId) {
- Utility.logDeprecated(logger, "Deprecated JobManager.forceRemoveJob(...) is called.");
- this.internalRemoveJobById(jobId, true);
- }
-
- /**
* @see org.apache.sling.event.jobs.JobManager#addJob(java.lang.String, java.util.Map)
*/
@Override
public Job addJob(String topic, Map<String, Object> properties) {
- return this.addJob(topic, null, properties, null);
- }
-
- /**
- * @see org.apache.sling.event.jobs.JobManager#addJob(java.lang.String, java.lang.String, java.util.Map)
- */
- @Override
- public Job addJob(final String topic, final String name, final Map<String, Object> properties) {
- Utility.logDeprecated(logger, "Deprecated JobManager.add(String, String, Map) is called.");
- return this.addJob(topic, name, properties, null);
- }
-
- /**
- * @see org.apache.sling.event.jobs.JobManager#getJobByName(java.lang.String)
- */
- @Override
- public Job getJobByName(final String name) {
- Utility.logDeprecated(logger, "Deprecated JobManager.getJobByName(String) is called.");
- final StringBuilder buf = new StringBuilder(64);
-
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
-
- buf.append("//element(*,");
- buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
- buf.append(")[@");
- buf.append(ISO9075.encode(JobUtil.PROPERTY_JOB_NAME));
- buf.append(" = '");
- buf.append(name);
- buf.append("']");
- final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
-
- while ( result.hasNext() ) {
- final Resource jobResource = result.next();
- // sanity check for the path
- if ( this.configuration.isJob(jobResource.getPath()) ) {
- final JobImpl job = Utility.readJob(logger, jobResource);
- if ( job != null ) {
- return job;
- }
- }
- }
- } catch (final QuerySyntaxException qse) {
- logger.warn("Query syntax wrong " + buf.toString(), qse);
- } finally {
- resolver.close();
- }
- return null;
+ return this.addJob(topic, properties, null);
}
/**
@@ -675,68 +520,6 @@ public class JobManagerImpl
return result;
}
-
-
- /**
- * Try to get a "lock" for a resource
- */
- private boolean lock(final String jobTopic, final String id) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Trying to get lock for {}", id);
- }
- boolean hasLock = false;
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final String lockName = ResourceHelper.filterName(id);
- final StringBuilder sb = new StringBuilder(this.configuration.getLocksPath());
- sb.append('/');
- sb.append(jobTopic.replace('/', '.'));
- sb.append('/');
- sb.append(lockName);
- final String path = sb.toString();
-
- Resource lockResource = resolver.getResource(path);
- if ( lockResource == null ) {
- resolver.refresh();
- try {
- final Map<String, Object> props = new HashMap<String, Object>();
- props.put(Utility.PROPERTY_LOCK_CREATED, Calendar.getInstance());
- props.put(Utility.PROPERTY_LOCK_CREATED_APP, Environment.APPLICATION_ID);
- props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, Utility.RESOURCE_TYPE_LOCK);
-
- lockResource = ResourceHelper.getOrCreateResource(resolver,
- path,
- props);
-
- // check if lock resource has correct name (SNS)
- if ( !lockResource.getName().equals(lockName) ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Created SNS lock resource on instance {} - discarding", Environment.APPLICATION_ID);
- }
- resolver.delete(lockResource);
- resolver.commit();
- } else {
- final ValueMap vm = lockResource.adaptTo(ValueMap.class);
- if ( logger.isDebugEnabled() ) {
- logger.debug("Got lock resource on instance {} with {}", Environment.APPLICATION_ID, vm.get(Utility.PROPERTY_LOCK_CREATED_APP));
- }
- if ( vm.get(Utility.PROPERTY_LOCK_CREATED_APP).equals(Environment.APPLICATION_ID) ) {
- hasLock = true;
- }
- }
- } catch (final PersistenceException ignore) {
- // ignore
- }
- }
- } finally {
- resolver.close();
- }
- if ( logger.isDebugEnabled() ) {
- logger.debug("Lock for {} = {}", id, hasLock);
- }
- return hasLock;
- }
-
/**
* Persist the job in the resource tree
* @param jobTopic The required job topic
@@ -745,50 +528,44 @@ public class JobManagerImpl
* @return The persisted job or <code>null</code>.
*/
private Job addJobInteral(final String jobTopic,
- final String jobName,
final Map<String, Object> jobProperties,
final List<String> errors) {
final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(jobTopic);
- // check for unique jobs
- if ( jobName != null && !this.lock(jobTopic, jobName) ) {
- logger.debug("Discarding duplicate job {}", Utility.toString(jobTopic, jobName, jobProperties));
- return null;
- } else {
- final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
- info.targetId = (caps == null ? null : caps.detectTarget(jobTopic, jobProperties, info));
- if ( logger.isDebugEnabled() ) {
- if ( info.targetId != null ) {
- logger.debug("Persisting job {} into queue {}, target={}", new Object[] {Utility.toString(jobTopic, jobName, jobProperties), info.queueName, info.targetId});
- } else {
- logger.debug("Persisting job {} into queue {}", Utility.toString(jobTopic, jobName, jobProperties), info.queueName);
- }
- }
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final JobImpl job = this.writeJob(resolver,
- jobTopic,
- jobName,
- jobProperties,
- info);
- if ( info.targetId != null ) {
- this.configuration.getAuditLogger().debug("ASSIGN OK {} : {}",
- info.targetId, job.getId());
- } else {
- this.configuration.getAuditLogger().debug("UNASSIGN OK : {}",
- job.getId());
- }
- return job;
- } catch (final PersistenceException re ) {
- // something went wrong, so let's log it
- this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobName, jobProperties) + "'", re);
- } finally {
- resolver.close();
+ final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
+ info.targetId = (caps == null ? null : caps.detectTarget(jobTopic, jobProperties, info));
+
+ if ( logger.isDebugEnabled() ) {
+ if ( info.targetId != null ) {
+ logger.debug("Persisting job {} into queue {}, target={}", new Object[] {Utility.toString(jobTopic, jobProperties), info.queueName, info.targetId});
+ } else {
+ logger.debug("Persisting job {} into queue {}", Utility.toString(jobTopic, jobProperties), info.queueName);
}
- if ( errors != null ) {
- errors.add("Unable to persist new job.");
+ }
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final JobImpl job = this.writeJob(resolver,
+ jobTopic,
+ jobProperties,
+ info);
+ if ( info.targetId != null ) {
+ this.configuration.getAuditLogger().debug("ASSIGN OK {} : {}",
+ info.targetId, job.getId());
+ } else {
+ this.configuration.getAuditLogger().debug("UNASSIGN OK : {}",
+ job.getId());
}
+ return job;
+ } catch (final PersistenceException re ) {
+ // something went wrong, so let's log it
+ this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobProperties) + "'", re);
+ } finally {
+ resolver.close();
+ }
+ if ( errors != null ) {
+ errors.add("Unable to persist new job.");
}
+
return null;
}
@@ -801,7 +578,6 @@ public class JobManagerImpl
*/
private JobImpl writeJob(final ResourceResolver resolver,
final String jobTopic,
- final String jobName,
final Map<String, Object> jobProperties,
final QueueInfo info)
throws PersistenceException {
@@ -822,9 +598,6 @@ public class JobManagerImpl
properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
- if ( jobName != null ) {
- properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
- }
properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueConfiguration.getName());
properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
@@ -841,7 +614,7 @@ public class JobManagerImpl
// create path and resource
properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
if ( logger.isDebugEnabled() ) {
- logger.debug("Storing new job {} at {}", Utility.toString(jobTopic, jobName, properties), path);
+ logger.debug("Storing new job {} at {}", Utility.toString(jobTopic, properties), path);
}
ResourceHelper.getOrCreateResource(resolver,
path,
@@ -849,7 +622,7 @@ public class JobManagerImpl
// update property types - priority, add path and create job
properties.put(JobImpl.PROPERTY_RESOURCE_PATH, path);
- return new JobImpl(jobTopic, jobName, jobId, properties);
+ return new JobImpl(jobTopic, jobId, properties);
}
/**
@@ -908,7 +681,7 @@ public class JobManagerImpl
/**
* Internal method to add a job
*/
- public Job addJob(final String topic, final String name,
+ public Job addJob(final String topic,
final Map<String, Object> properties,
final List<String> errors) {
final String errorMessage = Utility.checkJob(topic, properties);
@@ -917,46 +690,25 @@ public class JobManagerImpl
if ( errors != null ) {
errors.add(errorMessage);
}
- this.configuration.getAuditLogger().debug("ADD FAILED topic={}{}{}, properties={} : {}",
+ this.configuration.getAuditLogger().debug("ADD FAILED topic={}, properties={} : {}",
new Object[] {topic,
- name == null ? "" : ",name=",
- name == null ? "" : name,
properties,
errorMessage});
return null;
}
- if ( name != null ) {
- Utility.logDeprecated(logger, "Job is using deprecated name feature: " + Utility.toString(topic, name, properties));
- }
final List<String> errorList = new ArrayList<String>();
- Job result = this.addJobInteral(topic, name, properties, errorList);
+ Job result = this.addJobInteral(topic, properties, errorList);
if ( errors != null ) {
errors.addAll(errorList);
}
if ( result == null ) {
- if ( name != null ) {
- result = this.getJobByName(name);
- }
- if ( result == null ) {
- this.configuration.getAuditLogger().debug("ADD FAILED topic={}{}{}, properties={} : {}",
- new Object[] {topic,
- name == null ? "" : ",name=",
- name == null ? "" : name,
- properties,
- errorList});
- } else {
- this.configuration.getAuditLogger().debug("ADD DUP topic={}{}{}, properties={} : {}",
- new Object[] {topic,
- name == null ? "" : ",name=",
- name == null ? "" : name,
- properties,
- result.getId()});
- }
+ this.configuration.getAuditLogger().debug("ADD FAILED topic={}, properties={} : {}",
+ new Object[] {topic,
+ properties,
+ errorList});
} else {
- this.configuration.getAuditLogger().debug("ADD OK topic={}{}{}, properties={} : {}",
+ this.configuration.getAuditLogger().debug("ADD OK topic={}, properties={} : {}",
new Object[] {topic,
- name == null ? "" : ",name=",
- name == null ? "" : name,
properties,
result.getId()});
}
@@ -972,7 +724,7 @@ public class JobManagerImpl
final JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null && this.configuration.isStoragePath(job.getResourcePath()) ) {
this.internalRemoveJobById(jobId, true);
- return this.addJob(job.getTopic(), job.getName(), job.getProperties());
+ return this.addJob(job.getTopic(), job.getProperties());
}
return null;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Oct 26 10:16:42 2015
@@ -34,7 +34,6 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
import org.slf4j.Logger;
@@ -103,9 +102,6 @@ public abstract class Utility {
public static Event toEvent(final Job job) {
final Map<String, Object> eventProps = new HashMap<String, Object>();
eventProps.putAll(((JobImpl)job).getProperties());
- if ( job.getName() != null ) {
- eventProps.put(JobUtil.PROPERTY_JOB_NAME, job.getName());
- }
eventProps.put(ResourceHelper.PROPERTY_JOB_ID, job.getId());
eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
return new Event(job.getTopic(), eventProps);
@@ -121,7 +117,6 @@ public abstract class Utility {
boolean first = true;
for(final String propName : properties.keySet()) {
if ( propName.equals(ResourceHelper.PROPERTY_JOB_ID)
- || propName.equals(JobUtil.PROPERTY_JOB_NAME)
|| propName.equals(ResourceHelper.PROPERTY_JOB_TOPIC) ) {
continue;
}
@@ -153,15 +148,10 @@ public abstract class Utility {
* This method prints out the job topic and all of the properties.
*/
public static String toString(final String jobTopic,
- final String name,
final Map<String, Object> properties) {
final StringBuilder sb = new StringBuilder("Sling Job ");
sb.append("[topic=");
sb.append(jobTopic);
- if ( name != null ) {
- sb.append(", name=");
- sb.append(name);
- }
appendProperties(sb, properties);
sb.append("]");
@@ -179,10 +169,6 @@ public abstract class Utility {
sb.append(job.getTopic());
sb.append(", id=");
sb.append(job.getId());
- if ( job.getName() != null ) {
- sb.append(", name=");
- sb.append(job.getName());
- }
appendProperties(sb, ((JobImpl)job).getProperties());
sb.append("]");
return sb.toString();
@@ -224,7 +210,6 @@ public abstract class Utility {
}
}
job = new JobImpl(topic,
- (String)jobProperties.get(JobUtil.PROPERTY_JOB_NAME),
jobId,
jobProperties);
} else {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Oct 26 10:16:42 2015
@@ -30,10 +30,8 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.PropertyUnbounded;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.support.TopicMatcher;
import org.apache.sling.event.impl.support.TopicMatcherHelper;
-import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.osgi.framework.Constants;
import org.slf4j.Logger;
@@ -221,8 +219,8 @@ public class InternalQueueConfiguration
* If it is invalid, it is ignored.
*/
private boolean checkIsValid() {
- if ( type == Type.IGNORE || type == Type.DROP ) {
- Utility.logDeprecated(logger, "Queue is using deprecated queue type. Ignoring queue " + name + " with type " + type);
+ if ( type == Type._UNSUPPORTED_1 || type == Type._UNSUPPORTED_2 ) {
+ logger.error("Queue is using unsupported queue type. Ignoring queue " + name + " with type " + type);
return false;
}
if ( type == null ) {
@@ -307,14 +305,6 @@ public class InternalQueueConfiguration
}
/**
- * @see org.apache.sling.event.jobs.QueueConfiguration#getPriority()
- */
- @Override
- public JobUtil.JobPriority getPriority() {
- return JobUtil.JobPriority.valueOf(this.priority.name());
- }
-
- /**
* @see org.apache.sling.event.jobs.QueueConfiguration#getMaxParallel()
*/
@Override
@@ -322,12 +312,6 @@ public class InternalQueueConfiguration
return this.maxParallelProcesses;
}
- @Override
- @Deprecated
- public boolean isLocalQueue() {
- return false;
- }
-
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getTopics()
*/
@@ -349,12 +333,6 @@ public class InternalQueueConfiguration
}
@Override
- @Deprecated
- public String[] getApplicationIds() {
- return null;
- }
-
- @Override
public boolean isKeepJobs() {
return this.keepJobs;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java Mon Oct 26 10:16:42 2015
@@ -28,7 +28,6 @@ import java.util.TreeMap;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyView;
-import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.QueueConfiguration;
@@ -215,7 +214,7 @@ public class TopologyCapabilities {
* Return the potential targets (Sling IDs) sorted by ID
* @return A list of instance descriptions. The list might be empty.
*/
- public List<InstanceDescription> getPotentialTargets(final String jobTopic, final Map<String, Object> jobProperties) {
+ public List<InstanceDescription> getPotentialTargets(final String jobTopic) {
// calculate potential targets
final List<InstanceDescription> potentialTargets = new ArrayList<InstanceDescription>();
@@ -236,11 +235,6 @@ public class TopologyCapabilities {
pos = jobTopic.lastIndexOf('/', pos - 1);
} while ( pos > 0 );
}
- // third: bridged consumers
- final List<InstanceDescription> bridgedTargets = (jobProperties != null && jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) ? this.instanceCapabilities.get("/") : null);
- if ( bridgedTargets != null ) {
- potentialTargets.addAll(bridgedTargets);
- }
Collections.sort(potentialTargets, this.instanceComparator);
return potentialTargets;
@@ -251,7 +245,7 @@ public class TopologyCapabilities {
*/
public String detectTarget(final String jobTopic, final Map<String, Object> jobProperties,
final QueueInfo queueInfo) {
- final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic, jobProperties);
+ final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic);
logger.debug("Potential targets for {} : {}", jobTopic, potentialTargets);
String createdOn = null;
if ( jobProperties != null ) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java Mon Oct 26 10:16:42 2015
@@ -88,8 +88,8 @@ public class InventoryPlugin implements
case ORDERED : return "Ordered";
case TOPIC_ROUND_ROBIN : return "Topic Round Robin";
case UNORDERED : return "Parallel";
- case IGNORE : return "Ignore";
- case DROP : return "Drop";
+ case _UNSUPPORTED_1 : return "????";
+ case _UNSUPPORTED_2 : return "????";
}
return type.toString();
}
@@ -290,7 +290,7 @@ public class InventoryPlugin implements
pw.printf("Max Parallel : %s%n", c.getMaxParallel());
pw.printf("Max Retries : %s%n", c.getMaxRetries());
pw.printf("Retry Delay : %s ms%n", c.getRetryDelayInMs());
- pw.printf("Priority : %s%n", c.getPriority());
+ pw.printf("Priority : %s%n", c.getThreadPriority());
pw.printf("Ranking : %s%n", c.getRanking());
pw.println();
@@ -444,7 +444,7 @@ public class InventoryPlugin implements
pw.printf(" \"maxParallel\" : %s,%n", c.getMaxParallel());
pw.printf(" \"maxRetries\" : %s,%n", c.getMaxRetries());
pw.printf(" \"retryDelayInMs\" : %s,%n", c.getRetryDelayInMs());
- pw.printf(" \"priority\" : \"%s\",%n", c.getPriority());
+ pw.printf(" \"priority\" : \"%s\",%n", c.getThreadPriority());
pw.printf(" \"ranking\" : %s%n", c.getRanking());
pw.print(" }");
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Mon Oct 26 10:16:42 2015
@@ -372,7 +372,7 @@ public class WebConsolePlugin extends Ht
pw.printf("<tr><td>Max Parallel</td><td>%s</td></tr>", c.getMaxParallel());
pw.printf("<tr><td>Max Retries</td><td>%s</td></tr>", c.getMaxRetries());
pw.printf("<tr><td>Retry Delay</td><td>%s ms</td></tr>", c.getRetryDelayInMs());
- pw.printf("<tr><td>Priority</td><td>%s</td></tr>", c.getPriority());
+ pw.printf("<tr><td>Priority</td><td>%s</td></tr>", c.getThreadPriority());
pw.printf("<tr><td>Ranking</td><td>%s</td></tr>", c.getRanking());
pw.println("</tbody></table>");
@@ -403,8 +403,8 @@ public class WebConsolePlugin extends Ht
case ORDERED : return "Ordered";
case TOPIC_ROUND_ROBIN : return "Topic Round Robin";
case UNORDERED : return "Parallel";
- case IGNORE : return "Ignore";
- case DROP : return "Drop";
+ case _UNSUPPORTED_1 : return "????";
+ case _UNSUPPORTED_2 : return "????";
}
return type.toString();
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java Mon Oct 26 10:16:42 2015
@@ -19,13 +19,10 @@
package org.apache.sling.event.impl.jobs.notifications;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
-import java.util.Map;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
@@ -42,33 +39,11 @@ public abstract class NotificationUtilit
*/
public static void sendNotification(final EventAdmin eventAdmin,
final String eventTopic,
- final String jobTopic,
- final String jobName,
- final Map<String, Object> jobProperties,
- final Long time) {
- if ( eventAdmin != null ) {
- // create job object
- final Map<String, Object> jobProps;
- if ( jobProperties == null ) {
- jobProps = new HashMap<String, Object>();
- } else {
- jobProps = jobProperties;
- }
- final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
- sendNotificationInternal(eventAdmin, eventTopic, job, time);
- }
- }
-
- /**
- * Helper method for sending the notification events.
- */
- public static void sendNotification(final EventAdmin eventAdmin,
- final String eventTopic,
final Job job,
final Long time) {
if ( eventAdmin != null ) {
// create new copy of job object
- final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
+ final Job jobCopy = new JobImpl(job.getTopic(), job.getId(), ((JobImpl)job).getProperties());
sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
}
}
@@ -84,9 +59,6 @@ public abstract class NotificationUtilit
// add basic job properties
eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
- if ( job.getName() != null ) {
- eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
- }
// copy payload
for(final String name : job.getPropertyNames()) {
eventProps.put(name, job.getProperty(name));
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Mon Oct 26 10:16:42 2015
@@ -20,9 +20,7 @@ package org.apache.sling.event.impl.jobs
import java.util.Calendar;
import java.util.Date;
-import java.util.Dictionary;
import java.util.HashMap;
-import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
@@ -35,7 +33,6 @@ import org.apache.sling.api.resource.Per
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.threads.ThreadPool;
-import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.JobHandler;
@@ -43,19 +40,14 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
-import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifierImpl;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.support.BatchResourceRemover;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Job.JobState;
-import org.apache.sling.event.jobs.JobProcessor;
-import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration.Type;
import org.apache.sling.event.jobs.Statistics;
-import org.osgi.service.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,9 +61,6 @@ public class JobQueueImpl
/** Default timeout for suspend. */
private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
- /** Default number of milliseconds to wait for an ack. */
- private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
-
/** The logger. */
private final Logger logger;
@@ -267,159 +256,71 @@ public class JobQueueImpl
final JobImpl job = handler.getJob();
handler.started = System.currentTimeMillis();
- if ( handler.getConsumer() != null ) {
- this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
- // sanity check for the queued property
- Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
- if ( queued == null ) {
- // we simply use a date of ten seconds ago
- queued = Calendar.getInstance();
- queued.setTimeInMillis(System.currentTimeMillis() - 10000);
- }
- final long queueTime = handler.started - queued.getTimeInMillis();
- // update statistics
- this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
- // send notification
- NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
-
- synchronized ( this.processingJobsLists ) {
- this.processingJobsLists.put(job.getId(), handler);
- }
+ this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
+ // sanity check for the queued property
+ Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
+ if ( queued == null ) {
+ // we simply use a date of ten seconds ago
+ queued = Calendar.getInstance();
+ queued.setTimeInMillis(System.currentTimeMillis() - 10000);
+ }
+ final long queueTime = handler.started - queued.getTimeInMillis();
+ // update statistics
+ this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
+ // send notification
+ NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
- JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
- Job.JobState resultState = Job.JobState.ERROR;
- final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {
-
- @Override
- public void finished(final JobState state) {
- services.jobConsumerManager.unregisterListener(job.getId());
- finishedJob(job.getId(), state, true);
- asyncCounter.decrementAndGet();
- }
- });
+ synchronized ( this.processingJobsLists ) {
+ this.processingJobsLists.put(job.getId(), handler);
+ }
- try {
- synchronized ( ctx ) {
- result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
- if ( result == null ) { // ASYNC processing
- services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
- asyncCounter.incrementAndGet();
- ctx.markAsync();
- } else {
- if ( result.succeeded() ) {
- resultState = Job.JobState.SUCCEEDED;
- } else if ( result.failed() ) {
- resultState = Job.JobState.QUEUED;
- } else if ( result.cancelled() ) {
- if ( handler.isStopped() ) {
- resultState = Job.JobState.STOPPED;
- } else {
- resultState = Job.JobState.ERROR;
- }
- }
- }
- }
- } catch (final Throwable t) { //NOSONAR
- logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
- // we don't reschedule if an exception occurs
- result = JobExecutionResultImpl.CANCELLED;
- resultState = Job.JobState.ERROR;
- } finally {
- if ( result != null ) {
- if ( result.getRetryDelayInMs() != null ) {
- job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
- }
- if ( result.getMessage() != null ) {
- job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
- }
- this.finishedJob(job.getId(), resultState, false);
- }
+ JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
+ Job.JobState resultState = Job.JobState.ERROR;
+ final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {
+
+ @Override
+ public void finished(final JobState state) {
+ services.jobConsumerManager.unregisterListener(job.getId());
+ finishedJob(job.getId(), state, true);
+ asyncCounter.decrementAndGet();
}
+ });
- } else {
- final Event jobEvent = this.getJobEvent(handler);
- final JobStatusNotifierImpl notifier = (JobStatusNotifierImpl) jobEvent.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
- // we need async delivery, otherwise we might create a deadlock
- // as this method runs inside a synchronized block and the finishedJob
- // method as well!
- final long endOfAck = System.currentTimeMillis() + DEFAULT_WAIT_FOR_ACK_IN_MS;
- this.services.eventAdmin.postEvent(jobEvent);
-
- // wait for the ack
- synchronized ( notifier ) {
- while ( System.currentTimeMillis() < endOfAck && !notifier.isCalled() ) {
- try {
- notifier.wait(endOfAck - System.currentTimeMillis());
- } catch ( final InterruptedException ie) {
- Thread.currentThread().interrupt();
- ignoreException(ie);
+ try {
+ synchronized ( ctx ) {
+ result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
+ if ( result == null ) { // ASYNC processing
+ services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
+ asyncCounter.incrementAndGet();
+ ctx.markAsync();
+ } else {
+ if ( result.succeeded() ) {
+ resultState = Job.JobState.SUCCEEDED;
+ } else if ( result.failed() ) {
+ resultState = Job.JobState.QUEUED;
+ } else if ( result.cancelled() ) {
+ if ( handler.isStopped() ) {
+ resultState = Job.JobState.STOPPED;
+ } else {
+ resultState = Job.JobState.ERROR;
+ }
}
}
- if ( !notifier.isCalled() ) {
- notifier.markDone();
- }
}
- if ( !notifier.isCalled() ) {
- if ( handler.reschedule() ) {
- this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
- handler.getJob().retry();
- this.requeue(handler);
- }
- } else {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Received ack for job {}", Utility.toString(job));
- }
- this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
- // sanity check for the queued property
- Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
- if ( queued == null ) {
- // we simply use a date of ten seconds ago
- queued = Calendar.getInstance();
- queued.setTimeInMillis(System.currentTimeMillis() - 10000);
- }
- final long queueTime = handler.started - queued.getTimeInMillis();
- // update statistics
- this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
- // send notification
- NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
-
- synchronized ( this.processingJobsLists ) {
- this.processingJobsLists.put(job.getId(), handler);
+ } catch (final Throwable t) { //NOSONAR
+ logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
+ // we don't reschedule if an exception occurs
+ result = JobExecutionResultImpl.CANCELLED;
+ resultState = Job.JobState.ERROR;
+ } finally {
+ if ( result != null ) {
+ if ( result.getRetryDelayInMs() != null ) {
+ job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
}
-
- // check for processor
- final JobProcessor processor = notifier.getProcessor();
- if ( processor != null ) {
- boolean result = false;
- try {
- result = processor.process(jobEvent);
- } catch (Throwable t) { //NOSONAR
- LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
- // we don't reschedule if an exception occurs
- result = true;
- }
- if ( result ) {
- this.finishedJob(job.getId(), Job.JobState.SUCCEEDED, false);
- } else {
- this.finishedJob(job.getId(), Job.JobState.QUEUED, false);
- }
- } else {
- // async processing
- final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {
-
- @Override
- public void finished(final JobState state) {
- services.jobConsumerManager.unregisterListener(job.getId());
- finishedJob(job.getId(), state, true);
- asyncCounter.decrementAndGet();
- }
- });
- services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
- asyncCounter.incrementAndGet();
- ctx.markAsync();
-
- notifier.setJobExecutionContext(ctx);
+ if ( result.getMessage() != null ) {
+ job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
}
+ this.finishedJob(job.getId(), resultState, false);
}
}
} catch (final Exception re) {
@@ -628,30 +529,6 @@ public class JobQueueImpl
}
/**
- * Create the real job event.
- * This generates a new event object with the same properties, but with the
- * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
- * @param info The job event.
- * @return The real job event.
- */
- private Event getJobEvent(final JobHandler info) {
- final String eventTopic = info.getJob().getTopic();
- final Dictionary<String, Object> properties = new Hashtable<String, Object>();
- for(final String name : info.getJob().getPropertyNames()) {
- properties.put(name, info.getJob().getProperty(name));
- }
-
- // put properties for finished job callback
- properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME, new JobStatusNotifierImpl());
-
- // remove app id and distributable flag
- properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
- properties.remove(EventUtil.PROPERTY_APPLICATION);
-
- return new Event(eventTopic, properties);
- }
-
- /**
* @see org.apache.sling.event.jobs.Queue#resume()
*/
@Override
@@ -734,14 +611,6 @@ public class JobQueueImpl
}
/**
- * @see org.apache.sling.event.jobs.Queue#clear()
- */
- @Override
- public void clear() {
- // this is a noop
- }
-
- /**
* @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
*/
@Override
@@ -782,16 +651,6 @@ public class JobQueueImpl
return delay;
}
- /**
- * Helper method which just logs the exception in debug mode.
- * @param e
- */
- private void ignoreException(Exception e) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Ignored exception " + e.getMessage(), e);
- }
- }
-
public boolean stopJob(final JobImpl job) {
final JobHandler handler;
synchronized ( this.processingJobsLists ) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Mon Oct 26 10:16:42 2015
@@ -168,7 +168,7 @@ public class QueueJobCache {
final JobExecutor consumer = jobConsumerManager.getExecutor(job.getTopic());
handler = new JobHandler(job, consumer, this.configuration);
- if ( (consumer != null || (job.isBridgedEvent() && jobConsumerManager.supportsBridgedEvents())) ) {
+ if ( consumer != null ) {
if ( !handler.startProcessing(queue) ) {
statisticsManager.jobDequeued(queue.getName(), handler.getJob().getTopic());
if ( logger.isDebugEnabled() ) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Mon Oct 26 10:16:42 2015
@@ -18,7 +18,6 @@
*/
package org.apache.sling.event.impl.jobs.tasks;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -29,7 +28,6 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.discovery.InstanceDescription;
-import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
@@ -113,24 +111,13 @@ public class CheckTopologyTask {
// this resource should exist, but we check anyway
if ( jobsRoot != null ) {
- // check if this instance supports bridged jobs
- final List<InstanceDescription> bridgedTargets = caps.getPotentialTargets("/", null);
- boolean flag = false;
- for(final InstanceDescription desc : bridgedTargets) {
- if ( desc.isLocal() ) {
- flag = true;
- break;
- }
- }
- final boolean supportsBridged = flag;
-
final Iterator<Resource> topicIter = jobsRoot.listChildren();
while ( caps.isActive() && topicIter.hasNext() ) {
final Resource topicResource = topicIter.next();
final String topicName = topicResource.getName().replace('.', '/');
this.logger.debug("Checking topic {}..." , topicName);
- final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, null);
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
boolean reassign = true;
for(final InstanceDescription desc : potentialTargets) {
if ( desc.isLocal() ) {
@@ -150,37 +137,35 @@ public class CheckTopologyTask {
public boolean handle(final Resource rsrc) {
try {
final ValueMap vm = ResourceHelper.getValueMap(rsrc);
- if ( !supportsBridged || vm.get(JobImpl.PROPERTY_BRIDGED_EVENT) == null ) {
- final String targetId = caps.detectTarget(topicName, vm, info);
+ final String targetId = caps.detectTarget(topicName, vm, info);
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
- final String newPath;
+ final String newPath;
+ if ( targetId != null ) {
+ newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ } else {
+ newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ }
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
if ( targetId != null ) {
- newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
} else {
- newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- }
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(rsrc);
- resolver.commit();
- final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
- if ( targetId != null ) {
- configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
- } else {
- configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
- }
- } catch ( final PersistenceException pe ) {
- logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
- resolver.refresh();
- resolver.revert();
+ configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
}
+ } catch ( final PersistenceException pe ) {
+ logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
+ resolver.refresh();
+ resolver.revert();
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
@@ -228,9 +213,6 @@ public class CheckTopologyTask {
}
}
- /** Properties to include bridge job consumers for the quick test. */
- private static final Map<String, Object> BRIDGED_JOB = Collections.singletonMap(JobImpl.PROPERTY_BRIDGED_EVENT, (Object)Boolean.TRUE);
-
/**
* Try to assign all jobs from the jobs root.
* The jobs are stored by topic
@@ -249,7 +231,7 @@ public class CheckTopologyTask {
logger.debug("Found topic {}", topicName);
// first check if there is an instance for these topics
- final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, BRIDGED_JOB);
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
if ( qcm == null ) {
@@ -345,7 +327,7 @@ public class CheckTopologyTask {
}
// check for all topics
this.reassignStaleJobs();
-
+
// try to assign unassigned jobs
this.assignUnassignedJobs();
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java Mon Oct 26 10:16:42 2015
@@ -18,21 +18,15 @@
*/
package org.apache.sling.event.impl.jobs.tasks;
-import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
-import java.util.List;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceUtil;
-import org.apache.sling.api.resource.ValueMap;
-import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
-import org.apache.sling.event.impl.support.BatchResourceRemover;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,88 +91,10 @@ public class CleanUpTask {
}
}
- // lock cleanup is done every minute
- this.lockCleanup(topologyCapabilities);
logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
}
/**
- * Clean up the locks
- * All locks older than two minutes are removed
- */
- private void lockCleanup(final TopologyCapabilities caps) {
- if ( caps != null && caps.isLeader() ) {
- this.logger.debug("Cleaning up job resource tree: removing obsolete locks");
- final List<Resource> candidates = new ArrayList<Resource>();
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- if ( resolver != null ) {
- try {
- final Resource parentResource = resolver.getResource(this.configuration.getLocksPath());
- if ( parentResource != null ) {
- final Calendar startDate = Calendar.getInstance();
- startDate.add(Calendar.MINUTE, -2);
-
- this.lockCleanup(caps, candidates, parentResource, startDate);
- final BatchResourceRemover remover = new BatchResourceRemover();
- boolean batchRemove = true;
- for(final Resource lockResource : candidates) {
- if ( caps.isActive() ) {
- try {
- if ( batchRemove ) {
- remover.delete(lockResource);
- } else {
- resolver.delete(lockResource);
- resolver.commit();
- }
- } catch ( final PersistenceException pe) {
- batchRemove = false;
- this.ignoreException(pe);
- resolver.refresh();
- }
- } else {
- break;
- }
- }
- try {
- resolver.commit();
- } catch ( final PersistenceException pe) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- }
- } finally {
- resolver.close();
- }
- }
- }
- }
-
- /**
- * Recursive lock cleanup
- */
- private void lockCleanup(final TopologyCapabilities caps,
- final List<Resource> candidates,
- final Resource parentResource,
- final Calendar startDate) {
- for(final Resource childResource : parentResource.getChildren()) {
- if ( caps.isActive() ) {
- final ValueMap vm = ResourceUtil.getValueMap(childResource);
- final Calendar created = vm.get(Utility.PROPERTY_LOCK_CREATED, Calendar.class);
- if ( created != null ) {
- // lock resource
- if ( created.before(startDate) ) {
- candidates.add(childResource);
- }
- } else {
- lockCleanup(caps, candidates, childResource, startDate);
- }
- } else {
- break;
- }
- }
- }
-
- /**
* Simple empty folder removes empty folders for the last ten minutes
* starting five minutes ago.
* If folder for minute 59 is removed, we check the hour folder as well.
@@ -339,14 +255,4 @@ public class CleanUpTask {
resolver.close();
}
}
-
- /**
- * Helper method which just logs the exception in debug mode.
- * @param e
- */
- private void ignoreException(final Exception e) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Ignored exception " + e.getMessage(), e);
- }
- }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java Mon Oct 26 10:16:42 2015
@@ -29,7 +29,6 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.discovery.InstanceDescription;
-import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
@@ -82,7 +81,7 @@ public class UpgradeTask {
* This has changed, the jobs are now stored with their real topic.
*/
private void upgradeBridgedJobs() {
- final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
+ final String path = configuration.getLocalJobsPath() + "/slingevent:eventadmin";
final ResourceResolver resolver = configuration.createResourceResolver();
if ( resolver != null ) {
try {
@@ -91,7 +90,7 @@ public class UpgradeTask {
upgradeBridgedJobs(rootResource);
}
if ( caps.isLeader() ) {
- final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
+ final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + "/slingevent:eventadmin");
if ( unassignedRoot != null ) {
upgradeBridgedJobs(unassignedRoot);
}
@@ -226,7 +225,6 @@ public class UpgradeTask {
final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
- properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
final String topic = (String)properties.remove("slingevent:topic");
properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic);
@@ -243,7 +241,7 @@ public class UpgradeTask {
properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
}
- final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topic);
String targetId = null;
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
final QueueConfigurationManager qcm = configuration.getQueueConfigurationManager();
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Mon Oct 26 10:16:42 2015
@@ -35,12 +35,9 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.api.resource.ValueMap;
-import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.config.MainQueueConfiguration;
-import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.ScheduleInfo;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.EventConstants;
@@ -67,20 +64,15 @@ public abstract class ResourceHelper {
public static final String PROPERTY_JOB_ID = "slingevent:eventId";
public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
+ public static final String PROPERTY_DISTRIBUTE = "event.distribute";
+ public static final String PROPERTY_APPLICATION = "event.application";
/** List of ignored properties to write to the repository. */
- @SuppressWarnings("deprecation")
private static final String[] IGNORE_PROPERTIES = new String[] {
- EventUtil.PROPERTY_DISTRIBUTE,
- EventUtil.PROPERTY_APPLICATION,
+ ResourceHelper.PROPERTY_DISTRIBUTE,
+ ResourceHelper.PROPERTY_APPLICATION,
EventConstants.EVENT_TOPIC,
ResourceHelper.PROPERTY_JOB_ID,
- JobUtil.PROPERTY_JOB_PARALLEL,
- JobUtil.PROPERTY_JOB_RUN_LOCAL,
- JobUtil.PROPERTY_JOB_QUEUE_ORDERED,
- JobUtil.PROPERTY_NOTIFICATION_JOB,
- Job.PROPERTY_JOB_PRIORITY,
- JobStatusNotifier.CONTEXT_PROPERTY_NAME,
JobImpl.PROPERTY_DELAY_OVERRIDE,
JobConsumer.PROPERTY_JOB_ASYNC_HANDLER,
Job.PROPERTY_JOB_PROGRESS_LOG,