You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/14 14:31:24 UTC
svn commit: r1631731 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event:
impl/jobs/ impl/jobs/tasks/ jobs/
Author: cziegeler
Date: Tue Oct 14 12:31:24 2014
New Revision: 1631731
URL: http://svn.apache.org/r1631731
Log:
SLING-4048 : Avoid keeping jobs in memory. Make configuration a service
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java Tue Oct 14 12:31:24 2014
@@ -22,6 +22,13 @@ import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.event.impl.support.Environment;
@@ -29,6 +36,24 @@ import org.apache.sling.event.impl.suppo
* Configuration of the job handling
*
*/
+@Component(immediate=true, metatype=true,
+ label="Apache Sling Job Manager",
+ description="This is the central service of the job handling.",
+ name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
+@Service(value={JobManagerConfiguration.class})
+@Properties({
+ @Property(name=JobManagerConfiguration.PROPERTY_DISABLE_DISTRIBUTION,
+ boolValue=JobManagerConfiguration.DEFAULT_DISABLE_DISTRIBUTION,
+ label="Disable Distribution",
+ description="If the distribution is disabled, all jobs will be processed on the leader only! "
+ + "Please use this switch with care."),
+ @Property(name=JobManagerConfiguration.PROPERTY_REPOSITORY_PATH,
+ value=JobManagerConfiguration.DEFAULT_REPOSITORY_PATH, propertyPrivate=true),
+ @Property(name=JobManagerConfiguration.PROPERTY_SCHEDULED_JOBS_PATH,
+ value=JobManagerConfiguration.DEFAULT_SCHEDULED_JOBS_PATH, propertyPrivate=true),
+ @Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
+ longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
+})
public class JobManagerConfiguration {
/** Default resource path for jobs. */
@@ -97,10 +122,24 @@ public class JobManagerConfiguration {
/** The resource path where scheduled jobs are stored - ending with a slash. */
private String scheduledJobsPathWithSlash;
- public JobManagerConfiguration(final Map<String, Object> props) {
+ /**
+ * Update with a new configuration
+ */
+ @Modified
+ public void update(final Map<String, Object> props) {
+ this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
+ this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+ }
+
+ /**
+ * Activate this component.
+ * @param props Configuration properties
+ */
+ @Activate
+ protected void activate(final Map<String, Object> props) throws LoginException {
this.update(props);
this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(PROPERTY_REPOSITORY_PATH),
- DEFAULT_REPOSITORY_PATH) + '/';
+ DEFAULT_REPOSITORY_PATH) + '/';
// create initial resources
this.assignedJobsPath = this.jobsBasePathWithSlash + "assigned";
@@ -118,19 +157,11 @@ public class JobManagerConfiguration {
this.storedSuccessfulJobsPath = this.jobsBasePathWithSlash + "finished";
this.scheduledJobsPath = PropertiesUtil.toString(props.get(PROPERTY_SCHEDULED_JOBS_PATH),
- DEFAULT_SCHEDULED_JOBS_PATH);
+ DEFAULT_SCHEDULED_JOBS_PATH);
this.scheduledJobsPathWithSlash = this.scheduledJobsPath + "/";
}
/**
- * Update with a new configuration
- */
- public void update(final Map<String, Object> props) {
- this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
- this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
- }
-
- /**
* Get the resource path for all assigned jobs.
* @return The path - does not end with a slash.
*/
@@ -222,7 +253,7 @@ public class JobManagerConfiguration {
}
public boolean isLocalJob(final String jobPath) {
- return jobPath.startsWith(this.localJobsPathWithSlash);
+ return jobPath != null && jobPath.startsWith(this.localJobsPathWithSlash);
}
public boolean isJob(final String jobPath) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Tue Oct 14 12:31:24 2014
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMa
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
@@ -94,23 +93,9 @@ import org.slf4j.LoggerFactory;
/**
* Implementation of the job manager.
*/
-@Component(immediate=true, metatype=true,
- label="Apache Sling Job Manager",
- description="This is the central service of the job handling.",
- name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
+@Component(immediate=true)
@Service(value={JobManager.class, EventHandler.class, TopologyEventListener.class, Runnable.class})
@Properties({
- @Property(name=JobManagerConfiguration.PROPERTY_DISABLE_DISTRIBUTION,
- boolValue=JobManagerConfiguration.DEFAULT_DISABLE_DISTRIBUTION,
- label="Disable Distribution",
- description="If the distribution is disabled, all jobs will be processed on the leader only! Please use this switch " +
- "with care."),
- @Property(name=JobManagerConfiguration.PROPERTY_REPOSITORY_PATH,
- value=JobManagerConfiguration.DEFAULT_REPOSITORY_PATH, propertyPrivate=true),
- @Property(name=JobManagerConfiguration.PROPERTY_SCHEDULED_JOBS_PATH,
- value=JobManagerConfiguration.DEFAULT_SCHEDULED_JOBS_PATH, propertyPrivate=true),
- @Property(name=JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY,
- longValue=JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_DELAY, propertyPrivate=true),
@Property(name="scheduler.period", longValue=60, propertyPrivate=true),
@Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
@Property(name=EventConstants.EVENT_TOPIC,
@@ -156,8 +141,10 @@ public class JobManagerImpl
private ThreadPoolManager threadPoolManager;
/** The job manager configuration. */
+ @Reference
private JobManagerConfiguration configuration;
+
private volatile TopologyCapabilities topologyCapabilities;
private MaintenanceTask maintenanceTask;
@@ -191,7 +178,6 @@ public class JobManagerImpl
*/
@Activate
protected void activate(final Map<String, Object> props) throws LoginException {
- this.configuration = new JobManagerConfiguration(props);
this.jobScheduler = new JobSchedulerImpl(this.configuration, this.resourceResolverFactory, this.scheduler, this);
this.maintenanceTask = new MaintenanceTask(this.configuration, this.resourceResolverFactory);
this.backgroundLoader = new BackgroundLoader(this, this.configuration, this.resourceResolverFactory);
@@ -212,19 +198,6 @@ public class JobManagerImpl
}
/**
- * Configure this component.
- * @param props Configuration properties
- */
- @Modified
- protected void update(final Map<String, Object> props) {
- this.configuration.update(props);
- final TopologyCapabilities caps = this.topologyCapabilities;
- if ( caps != null ) {
- caps.update(this.configuration.disableDistribution());
- }
- }
-
- /**
* Deactivate this component.
*/
@Deactivate
@@ -236,7 +209,6 @@ public class JobManagerImpl
this.backgroundLoader = null;
this.maintenanceTask = null;
- this.configuration = null;
final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
while ( i.hasNext() ) {
final AbstractJobQueue jbq = i.next();
@@ -603,7 +575,7 @@ public class JobManagerImpl
private void startProcessing(final TopologyView view) {
// create new capabilities and update view
- this.topologyCapabilities = new TopologyCapabilities(view, this.configuration.disableDistribution());
+ this.topologyCapabilities = new TopologyCapabilities(view, this.configuration);
this.backgroundLoader.start();
}
@@ -1654,8 +1626,4 @@ public class JobManagerImpl
}
return null;
}
-
- public JobManagerConfiguration getConfiguration() {
- return this.configuration;
- }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java Tue Oct 14 12:31:24 2014
@@ -68,8 +68,8 @@ public class TopologyCapabilities {
/** Instance comparator. */
private final InstanceDescriptionComparator instanceComparator;
- /** Disable distribution flag. */
- private boolean disableDistribution;
+ /** JobManagerConfiguration. */
+ private final JobManagerConfiguration jobManagerConfiguration;
public static final class InstanceDescriptionComparator implements Comparator<InstanceDescription> {
@@ -118,8 +118,8 @@ public class TopologyCapabilities {
return allInstances;
}
- public TopologyCapabilities(final TopologyView view, final boolean disableDistribution) {
- this.disableDistribution = disableDistribution;
+ public TopologyCapabilities(final TopologyView view, final JobManagerConfiguration config) {
+ this.jobManagerConfiguration = config;
this.instanceComparator = new InstanceDescriptionComparator(view.getLocalInstance().getClusterView().getId());
this.isLeader = view.getLocalInstance().isLeader();
this.allInstances = getAllInstancesMap(view);
@@ -143,13 +143,6 @@ public class TopologyCapabilities {
this.instanceCapabilities = newCaps;
}
- /**
- * Update the configuration
- */
- public void update(final boolean disableDistribution2) {
- this.disableDistribution = disableDistribution2;
- }
-
public boolean isSame(final Map<String, String> newAllInstancesMap) {
return this.allInstances.equals(newAllInstancesMap);
}
@@ -229,7 +222,7 @@ public class TopologyCapabilities {
final List<InstanceDescription> localTargets = new ArrayList<InstanceDescription>();
for(final InstanceDescription desc : potentialTargets) {
if ( desc.getClusterView().getId().equals(createdOnInstance.getClusterView().getId()) ) {
- if ( !this.disableDistribution || desc.isLeader() ) {
+ if ( !this.jobManagerConfiguration.disableDistribution() || desc.isLeader() ) {
localTargets.add(desc);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java Tue Oct 14 12:31:24 2014
@@ -36,9 +36,8 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
@@ -74,7 +73,7 @@ public class HistoryCleanUpTask implemen
private ResourceResolverFactory resourceResolverFactory;
@Reference
- private JobManager jobManager;
+ private JobManagerConfiguration configuration;
@Override
public JobExecutionResult process(final Job job, final JobExecutionContext context) {
@@ -106,13 +105,13 @@ public class HistoryCleanUpTask implemen
resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
if ( stateList == null || stateList.contains(Job.JobState.SUCCEEDED.name()) ) {
- this.cleanup(removeDate, resolver, context, ((JobManagerImpl)jobManager).getConfiguration().getStoredSuccessfulJobsPath(), topics, null);
+ this.cleanup(removeDate, resolver, context, configuration.getStoredSuccessfulJobsPath(), topics, null);
}
if ( stateList == null || stateList.contains(Job.JobState.DROPPED.name())
|| stateList.contains(Job.JobState.ERROR.name())
|| stateList.contains(Job.JobState.GIVEN_UP.name())
|| stateList.contains(Job.JobState.STOPPED.name())) {
- this.cleanup(removeDate, resolver, context, ((JobManagerImpl)jobManager).getConfiguration().getStoredCancelledJobsPath(), topics, stateList);
+ this.cleanup(removeDate, resolver, context, configuration.getStoredCancelledJobsPath(), topics, stateList);
}
} catch (final PersistenceException pe) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1631731&r1=1631730&r2=1631731&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java Tue Oct 14 12:31:24 2014
@@ -73,7 +73,9 @@ public interface Queue {
* Remove all outstanding jobs from the queue. This does not delete
* the jobs. The jobs are either processed by a different cluster node
* or on restart.
+ * @deprecated This method does nothing anymore
*/
+ @Deprecated
void clear();
/**