You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/14 14:31:24 UTC

svn commit: r1631731 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event: impl/jobs/ impl/jobs/tasks/ jobs/

Author: cziegeler
Date: Tue Oct 14 12:31:24 2014
New Revision: 1631731

URL: http://svn.apache.org/r1631731
Log:
SLING-4048 : Avoid keeping jobs in memory. Make configuration a service

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java Tue Oct 14 12:31:24 2014
@@ -22,6 +22,13 @@ import java.util.Calendar;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.event.impl.support.Environment;
 
@@ -29,6 +36,24 @@ import org.apache.sling.event.impl.suppo
  * Configuration of the job handling
  *
  */
+@Component(immediate=true, metatype=true,
+           label="Apache Sling Job Manager",
+           description="This is the central service of the job handling.",
+           name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
+@Service(value={JobManagerConfiguration.class})
+@Properties({
+    @Property(name=JobManagerConfiguration.PROPERTY_DISABLE_DISTRIBUTION,
+              boolValue=JobManagerConfiguration.DEFAULT_DISABLE_DISTRIBUTION,
+              label="Disable Distribution",
+              description="If the distribution is disabled, all jobs will be processed on the leader only! "
+                        + "Please use this switch with care."),
+    @Property(name=JobManagerConfiguration.PROPERTY_REPOSITORY_PATH,
+              value=JobManagerConfiguration.DEFAULT_REPOSITORY_PATH, propertyPrivate=true),
+    @Property(name=JobManagerConfiguration.PROPERTY_SCHEDULED_JOBS_PATH,
+              value=JobManagerConfiguration.DEFAULT_SCHEDULED_JOBS_PATH, propertyPrivate=true),
+    @Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
+              longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
+})
 public class JobManagerConfiguration {
 
     /** Default resource path for jobs. */
@@ -97,10 +122,24 @@ public class JobManagerConfiguration {
     /** The resource path where scheduled jobs are stored - ending with a slash. */
     private String scheduledJobsPathWithSlash;
 
-    public JobManagerConfiguration(final Map<String, Object> props) {
+    /**
+     * Update with a new configuration
+     */
+    @Modified
+    public void update(final Map<String, Object> props) {
+        this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
+        this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+    }
+
+    /**
+     * Activate this component.
+     * @param props Configuration properties
+     */
+    @Activate
+    protected void activate(final Map<String, Object> props) throws LoginException {
         this.update(props);
         this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(PROPERTY_REPOSITORY_PATH),
-                            DEFAULT_REPOSITORY_PATH) + '/';
+                DEFAULT_REPOSITORY_PATH) + '/';
 
         // create initial resources
         this.assignedJobsPath = this.jobsBasePathWithSlash + "assigned";
@@ -118,19 +157,11 @@ public class JobManagerConfiguration {
         this.storedSuccessfulJobsPath = this.jobsBasePathWithSlash + "finished";
 
         this.scheduledJobsPath = PropertiesUtil.toString(props.get(PROPERTY_SCHEDULED_JOBS_PATH),
-                DEFAULT_SCHEDULED_JOBS_PATH);
+            DEFAULT_SCHEDULED_JOBS_PATH);
         this.scheduledJobsPathWithSlash = this.scheduledJobsPath + "/";
     }
 
     /**
-     * Update with a new configuration
-     */
-    public void update(final Map<String, Object> props) {
-        this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
-        this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
-    }
-
-    /**
      * Get the resource path for all assigned jobs.
      * @return The path - does not end with a slash.
      */
@@ -222,7 +253,7 @@ public class JobManagerConfiguration {
     }
 
     public boolean isLocalJob(final String jobPath) {
-        return jobPath.startsWith(this.localJobsPathWithSlash);
+        return jobPath != null && jobPath.startsWith(this.localJobsPathWithSlash);
     }
 
     public boolean isJob(final String jobPath) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Tue Oct 14 12:31:24 2014
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
@@ -94,23 +93,9 @@ import org.slf4j.LoggerFactory;
 /**
  * Implementation of the job manager.
  */
-@Component(immediate=true, metatype=true,
-           label="Apache Sling Job Manager",
-           description="This is the central service of the job handling.",
-           name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
+@Component(immediate=true)
 @Service(value={JobManager.class, EventHandler.class, TopologyEventListener.class, Runnable.class})
 @Properties({
-    @Property(name=JobManagerConfiguration.PROPERTY_DISABLE_DISTRIBUTION,
-            boolValue=JobManagerConfiguration.DEFAULT_DISABLE_DISTRIBUTION,
-            label="Disable Distribution",
-            description="If the distribution is disabled, all jobs will be processed on the leader only! Please use this switch " +
-                        "with care."),
-    @Property(name=JobManagerConfiguration.PROPERTY_REPOSITORY_PATH,
-             value=JobManagerConfiguration.DEFAULT_REPOSITORY_PATH, propertyPrivate=true),
-    @Property(name=JobManagerConfiguration.PROPERTY_SCHEDULED_JOBS_PATH,
-             value=JobManagerConfiguration.DEFAULT_SCHEDULED_JOBS_PATH, propertyPrivate=true),
-    @Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
-             longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
     @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
     @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
     @Property(name=EventConstants.EVENT_TOPIC,
@@ -156,8 +141,10 @@ public class JobManagerImpl
     private ThreadPoolManager threadPoolManager;
 
     /** The job manager configuration. */
+    @Reference
     private JobManagerConfiguration configuration;
 
+
     private volatile TopologyCapabilities topologyCapabilities;
 
     private MaintenanceTask maintenanceTask;
@@ -191,7 +178,6 @@ public class JobManagerImpl
      */
     @Activate
     protected void activate(final Map<String, Object> props) throws LoginException {
-        this.configuration = new JobManagerConfiguration(props);
         this.jobScheduler = new JobSchedulerImpl(this.configuration, this.resourceResolverFactory, this.scheduler, this);
         this.maintenanceTask = new MaintenanceTask(this.configuration, this.resourceResolverFactory);
         this.backgroundLoader = new BackgroundLoader(this, this.configuration, this.resourceResolverFactory);
@@ -212,19 +198,6 @@ public class JobManagerImpl
     }
 
     /**
-     * Configure this component.
-     * @param props Configuration properties
-     */
-    @Modified
-    protected void update(final Map<String, Object> props) {
-        this.configuration.update(props);
-        final TopologyCapabilities caps = this.topologyCapabilities;
-        if ( caps != null ) {
-            caps.update(this.configuration.disableDistribution());
-        }
-    }
-
-    /**
      * Deactivate this component.
      */
     @Deactivate
@@ -236,7 +209,6 @@ public class JobManagerImpl
         this.backgroundLoader = null;
 
         this.maintenanceTask = null;
-        this.configuration = null;
         final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
         while ( i.hasNext() ) {
             final AbstractJobQueue jbq = i.next();
@@ -603,7 +575,7 @@ public class JobManagerImpl
 
     private void startProcessing(final TopologyView view) {
         // create new capabilities and update view
-        this.topologyCapabilities = new TopologyCapabilities(view, this.configuration.disableDistribution());
+        this.topologyCapabilities = new TopologyCapabilities(view, this.configuration);
 
         this.backgroundLoader.start();
     }
@@ -1654,8 +1626,4 @@ public class JobManagerImpl
         }
         return null;
     }
-
-    public JobManagerConfiguration getConfiguration() {
-        return this.configuration;
-    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java Tue Oct 14 12:31:24 2014
@@ -68,8 +68,8 @@ public class TopologyCapabilities {
     /** Instance comparator. */
     private final InstanceDescriptionComparator instanceComparator;
 
-    /** Disable distribution flag. */
-    private boolean disableDistribution;
+    /** JobManagerConfiguration. */
+    private final JobManagerConfiguration jobManagerConfiguration;
 
     public static final class InstanceDescriptionComparator implements Comparator<InstanceDescription> {
 
@@ -118,8 +118,8 @@ public class TopologyCapabilities {
         return allInstances;
     }
 
-    public TopologyCapabilities(final TopologyView view, final boolean disableDistribution) {
-        this.disableDistribution = disableDistribution;
+    public TopologyCapabilities(final TopologyView view, final JobManagerConfiguration config) {
+        this.jobManagerConfiguration = config;
         this.instanceComparator = new InstanceDescriptionComparator(view.getLocalInstance().getClusterView().getId());
         this.isLeader = view.getLocalInstance().isLeader();
         this.allInstances = getAllInstancesMap(view);
@@ -143,13 +143,6 @@ public class TopologyCapabilities {
         this.instanceCapabilities = newCaps;
     }
 
-    /**
-     * Update the configuration
-     */
-    public void update(final boolean disableDistribution2) {
-        this.disableDistribution = disableDistribution2;
-    }
-
     public boolean isSame(final Map<String, String> newAllInstancesMap) {
         return this.allInstances.equals(newAllInstancesMap);
     }
@@ -229,7 +222,7 @@ public class TopologyCapabilities {
                 final List<InstanceDescription> localTargets = new ArrayList<InstanceDescription>();
                 for(final InstanceDescription desc : potentialTargets) {
                     if ( desc.getClusterView().getId().equals(createdOnInstance.getClusterView().getId()) ) {
-                        if ( !this.disableDistribution || desc.isLeader() ) {
+                        if ( !this.jobManagerConfiguration.disableDistribution() || desc.isLeader() ) {
                             localTargets.add(desc);
                         }
                     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java Tue Oct 14 12:31:24 2014
@@ -36,9 +36,8 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.consumer.JobExecutionContext;
 import org.apache.sling.event.jobs.consumer.JobExecutionResult;
 import org.apache.sling.event.jobs.consumer.JobExecutor;
@@ -74,7 +73,7 @@ public class HistoryCleanUpTask implemen
     private ResourceResolverFactory resourceResolverFactory;
 
     @Reference
-    private JobManager jobManager;
+    private JobManagerConfiguration configuration;
 
     @Override
     public JobExecutionResult process(final Job job, final JobExecutionContext context) {
@@ -106,13 +105,13 @@ public class HistoryCleanUpTask implemen
             resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
 
             if ( stateList == null || stateList.contains(Job.JobState.SUCCEEDED.name()) ) {
-                this.cleanup(removeDate, resolver, context, ((JobManagerImpl)jobManager).getConfiguration().getStoredSuccessfulJobsPath(), topics, null);
+                this.cleanup(removeDate, resolver, context, configuration.getStoredSuccessfulJobsPath(), topics, null);
             }
             if ( stateList == null || stateList.contains(Job.JobState.DROPPED.name())
                  || stateList.contains(Job.JobState.ERROR.name())
                  || stateList.contains(Job.JobState.GIVEN_UP.name())
                  || stateList.contains(Job.JobState.STOPPED.name())) {
-                this.cleanup(removeDate, resolver, context, ((JobManagerImpl)jobManager).getConfiguration().getStoredCancelledJobsPath(), topics, stateList);
+                this.cleanup(removeDate, resolver, context, configuration.getStoredCancelledJobsPath(), topics, stateList);
             }
 
         } catch (final PersistenceException pe) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java Tue Oct 14 12:31:24 2014
@@ -73,7 +73,9 @@ public interface Queue {
      * Remove all outstanding jobs from the queue. This does not delete
      * the jobs. The jobs are either processed by a different cluster node
      * or on restart.
+     * @deprecated This method does nothing anymore
      */
+    @Deprecated
     void clear();
 
     /**