You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/18 18:07:22 UTC
svn commit: r1632815 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
JobManagerImpl.java config/JobManagerConfiguration.java
stats/StatisticsManager.java tasks/CheckTopologyTask.java
tasks/UpgradeTask.java
Author: cziegeler
Date: Sat Oct 18 16:07:21 2014
New Revision: 1632815
URL: http://svn.apache.org/r1632815
Log:
SLING-4048 : Avoid keeping jobs in memory. Another refactoring: move all configuration code into config package
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Sat Oct 18 16:07:21 2014
@@ -46,7 +46,6 @@ import org.apache.sling.commons.threads.
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
@@ -118,9 +117,6 @@ public class JobManagerImpl
private JobManagerConfiguration configuration;
@Reference
- private QueueConfigurationManager queueManager;
-
- @Reference
private StatisticsManager statisticsManager;
@Reference
@@ -797,7 +793,7 @@ public class JobManagerImpl
final String jobName,
final Map<String, Object> jobProperties,
final List<String> errors) {
- final QueueInfo info = this.queueManager.getQueueInfo(jobTopic);
+ final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(jobTopic);
// check for unique jobs
if ( jobName != null && !this.lock(jobTopic, jobName) ) {
logger.debug("Discarding duplicate job {}", Utility.toString(jobTopic, jobName, jobProperties));
@@ -912,7 +908,7 @@ public class JobManagerImpl
final JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
// get the queue configuration
- final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
+ final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
final AbstractJobQueue queue = (AbstractJobQueue)this.qManager.getQueue(queueInfo.queueName);
boolean stopped = false;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Sat Oct 18 16:07:21 2014
@@ -455,13 +455,13 @@ public class JobManagerConfiguration imp
// before we propagate the new topology we do some maintenance
if ( eventType == Type.TOPOLOGY_INIT ) {
final UpgradeTask task = new UpgradeTask();
- task.run(this, this.topologyCapabilities, queueConfigManager);
+ task.run(this, this.topologyCapabilities);
final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
rt.run(this);
}
- final CheckTopologyTask mt = new CheckTopologyTask(this, this.queueConfigManager);
+ final CheckTopologyTask mt = new CheckTopologyTask(this);
mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
// start listeners
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Sat Oct 18 16:07:21 2014
@@ -28,7 +28,6 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.NotificationConstants;
@@ -60,10 +59,6 @@ public class StatisticsManager implement
@Reference
private JobManagerConfiguration configuration;
- /** The queue configuration manager. */
- @Reference
- private QueueConfigurationManager queueConfigurationManager;
-
/** Global statistics. */
private final StatisticsImpl globalStatistics = new StatisticsImpl() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Sat Oct 18 16:07:21 2014
@@ -32,9 +32,8 @@ import org.apache.sling.discovery.Instan
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.slf4j.Logger;
@@ -56,16 +55,11 @@ public class CheckTopologyTask {
/** Job manager configuration. */
private final JobManagerConfiguration configuration;
- /** Queue configuration manager. */
- private final QueueConfigurationManager queueConfigManager;
-
/**
* Constructor
*/
- public CheckTopologyTask(final JobManagerConfiguration config,
- final QueueConfigurationManager queueConfigurationManager) {
+ public CheckTopologyTask(final JobManagerConfiguration config) {
this.configuration = config;
- this.queueConfigManager = queueConfigurationManager;
}
/**
@@ -138,7 +132,7 @@ public class CheckTopologyTask {
}
}
if ( reassign ) {
- final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
+ final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topicName);
JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
@@ -239,7 +233,7 @@ public class CheckTopologyTask {
// first check if there is an instance for these topics
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, BRIDGED_JOB);
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
- final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
+ final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topicName);
logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java Sat Oct 18 16:07:21 2014
@@ -32,9 +32,8 @@ import org.apache.sling.discovery.Instan
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
@@ -55,12 +54,11 @@ public class UpgradeTask {
* Upgrade
*/
public void run(final JobManagerConfiguration configuration,
- final TopologyCapabilities topologyCapabilities,
- final QueueConfigurationManager queueManager) {
+ final TopologyCapabilities topologyCapabilities) {
if ( topologyCapabilities.isLeader() ) {
- this.processJobsFromPreviousVersions(configuration, topologyCapabilities, queueManager);
+ this.processJobsFromPreviousVersions(configuration, topologyCapabilities);
}
- this.upgradeBridgedJobs(configuration, topologyCapabilities, queueManager);
+ this.upgradeBridgedJobs(configuration, topologyCapabilities);
}
/**
@@ -69,19 +67,18 @@ public class UpgradeTask {
* This has changed, the jobs are now stored with their real topic.
*/
private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
- final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
+ final TopologyCapabilities caps) {
final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
final ResourceResolver resolver = configuration.createResourceResolver();
try {
final Resource rootResource = resolver.getResource(path);
if ( rootResource != null ) {
- upgradeBridgedJobs(configuration, rootResource, caps, queueManager);
+ upgradeBridgedJobs(configuration, rootResource, caps);
}
if ( caps.isLeader() ) {
final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
if ( unassignedRoot != null ) {
- upgradeBridgedJobs(configuration, unassignedRoot, caps, queueManager);
+ upgradeBridgedJobs(configuration, unassignedRoot, caps);
}
}
} finally {
@@ -97,10 +94,9 @@ public class UpgradeTask {
*/
private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
final Resource topicResource,
- final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
+ final TopologyCapabilities caps) {
final String topicName = topicResource.getName().replace('.', '/');
- final QueueInfo info = queueManager.getQueueInfo(topicName);
+ final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topicName);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
@@ -144,12 +140,11 @@ public class UpgradeTask {
* Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
*/
private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
- final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
+ final TopologyCapabilities caps) {
final ResourceResolver resolver = configuration.createResourceResolver();
try {
- this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionAnonPath()));
- this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+ this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionAnonPath()));
+ this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
} catch ( final PersistenceException pe ) {
this.logger.warn("Problems moving jobs from previous version.", pe);
} finally {
@@ -162,14 +157,13 @@ public class UpgradeTask {
*/
private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager,
final Resource rsrc) throws PersistenceException {
if ( rsrc != null && caps.isActive() ) {
if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
- this.moveJobFromPreviousVersion(configuration, caps, queueManager, rsrc);
+ this.moveJobFromPreviousVersion(configuration, caps, rsrc);
} else {
for(final Resource child : rsrc.getChildren()) {
- this.processJobsFromPreviousVersions(configuration, caps, queueManager, child);
+ this.processJobsFromPreviousVersions(configuration, caps, child);
}
if ( caps.isActive() ) {
rsrc.getResourceResolver().delete(rsrc);
@@ -185,7 +179,6 @@ public class UpgradeTask {
*/
private void moveJobFromPreviousVersion(final JobManagerConfiguration configuration,
final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager,
final Resource jobResource)
throws PersistenceException {
final ResourceResolver resolver = jobResource.getResourceResolver();
@@ -240,7 +233,7 @@ public class UpgradeTask {
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
String targetId = null;
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
- final QueueInfo info = queueManager.getQueueInfo(topic);
+ final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topic);
logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
targetId = caps.detectTarget(topic, vm, info);
if ( targetId != null ) {