You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/18 18:07:22 UTC

svn commit: r1632815 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: JobManagerImpl.java config/JobManagerConfiguration.java stats/StatisticsManager.java tasks/CheckTopologyTask.java tasks/UpgradeTask.java

Author: cziegeler
Date: Sat Oct 18 16:07:21 2014
New Revision: 1632815

URL: http://svn.apache.org/r1632815
Log:
SLING-4048 : Avoid keeping jobs in memory. Another refactoring: move all configuration code into config package

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Sat Oct 18 16:07:21 2014
@@ -46,7 +46,6 @@ import org.apache.sling.commons.threads.
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
@@ -118,9 +117,6 @@ public class JobManagerImpl
     private JobManagerConfiguration configuration;
 
     @Reference
-    private QueueConfigurationManager queueManager;
-
-    @Reference
     private StatisticsManager statisticsManager;
 
     @Reference
@@ -797,7 +793,7 @@ public class JobManagerImpl
             final String jobName,
             final Map<String, Object> jobProperties,
             final List<String> errors) {
-        final QueueInfo info = this.queueManager.getQueueInfo(jobTopic);
+        final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(jobTopic);
         // check for unique jobs
         if ( jobName != null && !this.lock(jobTopic, jobName) ) {
             logger.debug("Discarding duplicate job {}", Utility.toString(jobTopic, jobName, jobProperties));
@@ -912,7 +908,7 @@ public class JobManagerImpl
         final JobImpl job = (JobImpl)this.getJobById(jobId);
         if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
             // get the queue configuration
-            final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
+            final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
             final AbstractJobQueue queue = (AbstractJobQueue)this.qManager.getQueue(queueInfo.queueName);
 
             boolean stopped = false;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Sat Oct 18 16:07:21 2014
@@ -455,13 +455,13 @@ public class JobManagerConfiguration imp
         // before we propagate the new topology we do some maintenance
         if ( eventType == Type.TOPOLOGY_INIT ) {
             final UpgradeTask task = new UpgradeTask();
-            task.run(this, this.topologyCapabilities, queueConfigManager);
+            task.run(this, this.topologyCapabilities);
 
             final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
             rt.run(this);
         }
 
-        final CheckTopologyTask mt = new CheckTopologyTask(this, this.queueConfigManager);
+        final CheckTopologyTask mt = new CheckTopologyTask(this);
         mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
 
         // start listeners

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java Sat Oct 18 16:07:21 2014
@@ -28,7 +28,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.NotificationConstants;
@@ -60,10 +59,6 @@ public class StatisticsManager implement
     @Reference
     private JobManagerConfiguration configuration;
 
-    /** The queue configuration manager. */
-    @Reference
-    private QueueConfigurationManager queueConfigurationManager;
-
     /** Global statistics. */
     private final StatisticsImpl globalStatistics = new StatisticsImpl() {
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Sat Oct 18 16:07:21 2014
@@ -32,9 +32,8 @@ import org.apache.sling.discovery.Instan
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
 import org.slf4j.Logger;
@@ -56,16 +55,11 @@ public class CheckTopologyTask {
     /** Job manager configuration. */
     private final JobManagerConfiguration configuration;
 
-    /** Queue configuration manager. */
-    private final QueueConfigurationManager queueConfigManager;
-
     /**
      * Constructor
      */
-    public CheckTopologyTask(final JobManagerConfiguration config,
-            final QueueConfigurationManager queueConfigurationManager) {
+    public CheckTopologyTask(final JobManagerConfiguration config) {
         this.configuration = config;
-        this.queueConfigManager = queueConfigurationManager;
     }
 
     /**
@@ -138,7 +132,7 @@ public class CheckTopologyTask {
                             }
                         }
                         if ( reassign ) {
-                            final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
+                            final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topicName);
                             JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
 
                                 @Override
@@ -239,7 +233,7 @@ public class CheckTopologyTask {
             // first check if there is an instance for these topics
             final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, BRIDGED_JOB);
             if ( potentialTargets != null && potentialTargets.size() > 0 ) {
-                final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
+                final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topicName);
                 logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
 
                 JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java?rev=1632815&r1=1632814&r2=1632815&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java Sat Oct 18 16:07:21 2014
@@ -32,9 +32,8 @@ import org.apache.sling.discovery.Instan
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
@@ -55,12 +54,11 @@ public class UpgradeTask {
      * Upgrade
      */
     public void run(final JobManagerConfiguration configuration,
-            final TopologyCapabilities topologyCapabilities,
-            final QueueConfigurationManager queueManager) {
+            final TopologyCapabilities topologyCapabilities) {
         if ( topologyCapabilities.isLeader() ) {
-            this.processJobsFromPreviousVersions(configuration, topologyCapabilities, queueManager);
+            this.processJobsFromPreviousVersions(configuration, topologyCapabilities);
         }
-        this.upgradeBridgedJobs(configuration, topologyCapabilities, queueManager);
+        this.upgradeBridgedJobs(configuration, topologyCapabilities);
     }
 
     /**
@@ -69,19 +67,18 @@ public class UpgradeTask {
      * This has changed, the jobs are now stored with their real topic.
      */
     private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
-            final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
+            final TopologyCapabilities caps) {
         final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
         final ResourceResolver resolver = configuration.createResourceResolver();
         try {
             final Resource rootResource = resolver.getResource(path);
             if ( rootResource != null ) {
-                upgradeBridgedJobs(configuration, rootResource, caps, queueManager);
+                upgradeBridgedJobs(configuration, rootResource, caps);
             }
             if ( caps.isLeader() ) {
                 final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
                 if ( unassignedRoot != null ) {
-                    upgradeBridgedJobs(configuration, unassignedRoot, caps, queueManager);
+                    upgradeBridgedJobs(configuration, unassignedRoot, caps);
                 }
             }
         } finally {
@@ -97,10 +94,9 @@ public class UpgradeTask {
      */
     private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
             final Resource topicResource,
-            final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
+            final TopologyCapabilities caps) {
         final String topicName = topicResource.getName().replace('.', '/');
-        final QueueInfo info = queueManager.getQueueInfo(topicName);
+        final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topicName);
         JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback() {
 
             @Override
@@ -144,12 +140,11 @@ public class UpgradeTask {
      * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
      */
     private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
-            final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
+            final TopologyCapabilities caps) {
         final ResourceResolver resolver = configuration.createResourceResolver();
         try {
-            this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionAnonPath()));
-            this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+            this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionAnonPath()));
+            this.processJobsFromPreviousVersions(configuration, caps, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
         } catch ( final PersistenceException pe ) {
             this.logger.warn("Problems moving jobs from previous version.", pe);
         } finally {
@@ -162,14 +157,13 @@ public class UpgradeTask {
      */
     private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
             final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager,
             final Resource rsrc) throws PersistenceException {
         if ( rsrc != null && caps.isActive() ) {
             if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
-                this.moveJobFromPreviousVersion(configuration, caps, queueManager, rsrc);
+                this.moveJobFromPreviousVersion(configuration, caps, rsrc);
             } else {
                 for(final Resource child : rsrc.getChildren()) {
-                    this.processJobsFromPreviousVersions(configuration, caps, queueManager, child);
+                    this.processJobsFromPreviousVersions(configuration, caps, child);
                 }
                 if ( caps.isActive() ) {
                     rsrc.getResourceResolver().delete(rsrc);
@@ -185,7 +179,6 @@ public class UpgradeTask {
      */
     private void moveJobFromPreviousVersion(final JobManagerConfiguration configuration,
             final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager,
             final Resource jobResource)
     throws PersistenceException {
         final ResourceResolver resolver = jobResource.getResourceResolver();
@@ -240,7 +233,7 @@ public class UpgradeTask {
             final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
             String targetId = null;
             if ( potentialTargets != null && potentialTargets.size() > 0 ) {
-                final QueueInfo info = queueManager.getQueueInfo(topic);
+                final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topic);
                 logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
                 targetId = caps.detectTarget(topic, vm, info);
                 if ( targetId != null ) {