You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/07 14:07:11 UTC
svn commit: r1529825 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/ main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/support/
main/java/org/apache/sling/event/jobs/ test/java/org/apach...
Author: cziegeler
Date: Mon Oct 7 12:07:10 2013
New Revision: 1529825
URL: http://svn.apache.org/r1529825
Log:
SLING-3139 : Provide a way to schedule jobs
SLING-3138 : Add fluent api to create new jobs
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java (with props)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Mon Oct 7 12:07:10 2013
@@ -156,22 +156,46 @@ public abstract class EventUtil {
* Timed Events
*/
- /** The topic for timed events. */
+ /**
+ * The topic for timed events.
+ * @deprecated Use scheduled jobs instead
+ */
+ @Deprecated
public static final String TOPIC_TIMED_EVENT = "org/apache/sling/event/timed";
- /** The real topic of the event. */
+ /**
+ * The real topic of the event.
+ * @deprecated Use scheduled jobs instead
+ */
+ @Deprecated
public static final String PROPERTY_TIMED_EVENT_TOPIC = "event.topic.timed";
- /** The property for the unique event id. */
+ /**
+ * The property for the unique event id.
+ * @deprecated Use scheduled jobs instead
+ */
+ @Deprecated
public static final String PROPERTY_TIMED_EVENT_ID = "event.timed.id";
- /** The scheduler cron expression for the timed event. Type must be String. */
+ /**
+ * The scheduler cron expression for the timed event. Type must be String.
+ * @deprecated Use scheduled jobs instead
+ */
+ @Deprecated
public static final String PROPERTY_TIMED_EVENT_SCHEDULE = "event.timed.scheduler";
- /** The period in seconds for the timed event. Type must be Long*/
+ /**
+ * The period in seconds for the timed event. Type must be Long.
+ * @deprecated Use scheduled jobs instead
+ */
+ @Deprecated
public static final String PROPERTY_TIMED_EVENT_PERIOD = "event.timed.period";
- /** The date for the timed event. Type must be Date. */
+ /**
+ * The date for the timed event. Type must be Date.
+ * @deprecated Use scheduled jobs instead
+ */
+ @Deprecated
public static final String PROPERTY_TIMED_EVENT_DATE = "event.timed.date";
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java Mon Oct 7 12:07:10 2013
@@ -25,7 +25,9 @@ import org.osgi.service.event.Event;
/**
* This service provides the current timed events status.
+ * @deprecated Use scheduled jobs instead
*/
+@Deprecated
public interface TimedEventStatusProvider {
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java Mon Oct 7 12:07:10 2013
@@ -21,9 +21,11 @@ package org.apache.sling.event.impl.jobs
import java.util.Date;
import java.util.Map;
+import org.apache.sling.event.impl.support.ScheduleInfo;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobBuilder;
-import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+import org.slf4j.Logger;
/**
* Fluent builder API
@@ -32,15 +34,18 @@ public class JobBuilderImpl implements J
private final String topic;
- private final JobManager jobManager;
+ private final JobManagerImpl jobManager;
+
+ private final Logger logger;
private String name;
private Map<String, Object> properties;
- public JobBuilderImpl(final JobManager manager, final String topic) {
+ public JobBuilderImpl(final JobManagerImpl manager, final Logger logger, final String topic) {
this.jobManager = manager;
this.topic = topic;
+ this.logger = logger;
}
@Override
@@ -62,43 +67,100 @@ public class JobBuilderImpl implements J
@Override
public ScheduleBuilder schedule(final String name) {
- return null;
+ return new ScheduleBuilderImpl(name);
}
public final class ScheduleBuilderImpl implements ScheduleBuilder {
+ private final String scheduleName;
+
+ public ScheduleBuilderImpl(final String name) {
+ this.scheduleName = name;
+ }
+
+ private boolean check() {
+ if ( this.scheduleName == null || this.scheduleName.length() == 0 ) {
+ logger.warn("Discarding scheduled job - schedule name not specified");
+ return false;
+ }
+ final String errorMessage = Utility.checkJob(topic, properties);
+ if ( errorMessage != null ) {
+ logger.warn("{}", errorMessage);
+ return false;
+ }
+ return true;
+ }
+
@Override
- public boolean periodically(int minutes) {
- // TODO Auto-generated method stub
+ public boolean periodically(final int minutes) {
+ if ( check() ) {
+ if ( minutes > 0 ) {
+ final ScheduleInfo info = ScheduleInfo.PERIODIC(minutes);
+ return jobManager.addScheduledJob(topic, name, properties, scheduleName, info);
+ }
+ logger.warn("Discarding scheduled job - period must be higher than 0 : {}", minutes);
+ }
return false;
}
@Override
public TimeBuilder daily() {
- // TODO Auto-generated method stub
- return null;
+ return new TimeBuilderImpl(ScheduledJobInfo.ScheduleType.DAILY, -1);
}
@Override
- public TimeBuilder weekly(int day) {
- // TODO Auto-generated method stub
- return null;
+ public TimeBuilder weekly(final int day) {
+ return new TimeBuilderImpl(ScheduledJobInfo.ScheduleType.WEEKLY, day);
}
@Override
- public boolean at(Date date) {
- // TODO Auto-generated method stub
+ public boolean at(final Date date) {
+ if ( check() ) {
+ if ( date != null && date.getTime() > System.currentTimeMillis() ) {
+ final ScheduleInfo info = ScheduleInfo.AT(date);
+ return jobManager.addScheduledJob(topic, name, properties, scheduleName, info);
+ }
+ logger.warn("Discarding scheduled job - date must be in the future : {}", date);
+ }
return false;
}
public final class TimeBuilderImpl implements TimeBuilder {
+ private final ScheduledJobInfo.ScheduleType scheduleType;
+
+ private final int day;
+
+ public TimeBuilderImpl(ScheduledJobInfo.ScheduleType scheduleType, final int day) {
+ this.scheduleType = scheduleType;
+ this.day = day;
+ }
+
@Override
- public boolean at(int hour, int minute) {
- // TODO Auto-generated method stub
+ public boolean at(final int hour, final int minute) {
+ if ( check() ) {
+ boolean valid = true;
+ if ( scheduleType == ScheduledJobInfo.ScheduleType.WEEKLY ) {
+ if ( day < 1 || day > 7 ) {
+ valid = false;
+ logger.warn("Discarding scheduled job - day must be between 1 and 7 : {}", day);
+ }
+ }
+ if ( valid ) {
+ if ( hour >= 0 && hour < 24 && minute >= 0 && minute < 60 ) {
+ final ScheduleInfo info;
+ if ( scheduleType == ScheduledJobInfo.ScheduleType.WEEKLY ) {
+ info = ScheduleInfo.WEEKLY(this.day, hour, minute);
+ } else {
+ info = ScheduleInfo.DAYLY(hour, minute);
+ }
+ return jobManager.addScheduledJob(topic, name, properties, scheduleName, info);
+ }
+ logger.warn("Discarding scheduled job - wrong time information : {}â¦{}", hour, minute);
+ }
+ }
return false;
}
-
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java Mon Oct 7 12:07:10 2013
@@ -31,24 +31,30 @@ import org.apache.sling.event.impl.suppo
*/
public class JobManagerConfiguration {
- /** Default repository path. */
+ /** Default resource path for jobs. */
public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
- /** The path where all jobs are stored. */
- public static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
-
/** Default background load delay. */
public static final long DEFAULT_BACKGROUND_LOAD_DELAY = 30;
- /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
- public static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
-
/** Default for disabling the distribution. */
public static final boolean DEFAULT_DISABLE_DISTRIBUTION = false;
+ /** Default resource path for scheduled jobs. */
+ private static final String DEFAULT_SCHEDULED_JOBS_PATH = "/var/eventing/scheduled-jobs";
+
+ /** The path where all jobs are stored. */
+ public static final String PROPERTY_REPOSITORY_PATH = "repository.path";
+
+ /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
+ public static final String PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
+
/** Configuration switch for distributing the jobs. */
public static final String PROPERTY_DISABLE_DISTRIBUTION = "job.consumermanager.disableDistribution";
+ /** Configuration property for the scheduled jobs path. */
+ private static final String PROPERTY_SCHEDULED_JOBS_PATH = "job.scheduled.jobs.path";
+
/** The jobs base path with a slash. */
private String jobsBasePathWithSlash;
@@ -82,9 +88,15 @@ public class JobManagerConfiguration {
private String storedSuccessfulJobsPath;
+ /** The resource path where scheduled jobs are stored. */
+ private String scheduledJobsPath;
+
+ /** The resource path where scheduled jobs are stored - ending with a slash. */
+ private String scheduledJobsPathWithSlash;
+
public JobManagerConfiguration(final Map<String, Object> props) {
this.update(props);
- this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH),
+ this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(PROPERTY_REPOSITORY_PATH),
DEFAULT_REPOSITORY_PATH) + '/';
// create initial resources
@@ -101,6 +113,10 @@ public class JobManagerConfiguration {
this.storedCancelledJobsPath = this.jobsBasePathWithSlash + "cancelled";
this.storedSuccessfulJobsPath = this.jobsBasePathWithSlash + "finished";
+
+ this.scheduledJobsPath = PropertiesUtil.toString(props.get(PROPERTY_SCHEDULED_JOBS_PATH),
+ DEFAULT_SCHEDULED_JOBS_PATH);
+ this.scheduledJobsPathWithSlash = this.scheduledJobsPath + "/";
}
/**
@@ -108,7 +124,7 @@ public class JobManagerConfiguration {
*/
public void update(final Map<String, Object> props) {
this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
- this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+ this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
}
/**
@@ -226,6 +242,12 @@ public class JobManagerConfiguration {
return this.disabledDistribution;
}
+ /**
+ * Get the storage path for finished jobs.
+ * @param finishedJob The finished job
+ * @param isSuccess Whether processing was successful or not
+ * @return The complete storage path
+ */
public String getStoragePath(final JobImpl finishedJob, final boolean isSuccess) {
final String topicName = (finishedJob.isBridgedEvent() ? JobImpl.PROPERTY_BRIDGED_EVENT : finishedJob.getTopic().replace('/', '.'));
final StringBuilder sb = new StringBuilder();
@@ -243,7 +265,18 @@ public class JobManagerConfiguration {
}
+ /**
+ * Check whether this is a storage path.
+ */
public boolean isStoragePath(final String path) {
return path.startsWith(this.storedCancelledJobsPath) || path.startsWith(this.storedSuccessfulJobsPath);
}
+
+ public String getScheduledJobsPath() {
+ return this.scheduledJobsPath;
+ }
+
+ public String getScheduledJobsPathWithSlash() {
+ return this.scheduledJobsPathWithSlash;
+ }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Oct 7 12:07:10 2013
@@ -18,7 +18,6 @@
*/
package org.apache.sling.event.impl.jobs;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
@@ -70,6 +69,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfo;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobBuilder;
import org.apache.sling.event.jobs.JobManager;
@@ -99,12 +99,14 @@ import org.slf4j.LoggerFactory;
name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
@Service(value={JobManager.class, EventHandler.class, TopologyEventListener.class, Runnable.class})
@Properties({
- @Property(name=JobManagerConfiguration.CONFIG_PROPERTY_REPOSITORY_PATH,
+ @Property(name=JobManagerConfiguration.PROPERTY_REPOSITORY_PATH,
value=JobManagerConfiguration.DEFAULT_REPOSITORY_PATH),
@Property(name="scheduler.period", longValue=60),
@Property(name="scheduler.concurrent", boolValue=false),
@Property(name=EventConstants.EVENT_TOPIC,
value={SlingConstants.TOPIC_RESOURCE_ADDED,
+ SlingConstants.TOPIC_RESOURCE_CHANGED,
+ SlingConstants.TOPIC_RESOURCE_REMOVED,
"org/apache/sling/event/notification/job/*",
ResourceHelper.BUNDLE_EVENT_STARTED,
ResourceHelper.BUNDLE_EVENT_UPDATED})
@@ -166,6 +168,9 @@ public class JobManagerImpl
/** Set of paths directly added as jobs - these will be ignored during observation handling. */
private final Set<String> directlyAddedPaths = new HashSet<String>();
+ /** Job Scheduler. */
+ private JobSchedulerImpl jobScheduler;
+
/**
* Activate this component.
* @param props Configuration properties
@@ -173,6 +178,7 @@ public class JobManagerImpl
@Activate
protected void activate(final Map<String, Object> props) throws LoginException {
this.configuration = new JobManagerConfiguration(props);
+ this.jobScheduler = new JobSchedulerImpl(this.configuration, this.resourceResolverFactory, this.scheduler, this);
this.maintenanceTask = new MaintenanceTask(this.configuration, this.resourceResolverFactory);
this.backgroundLoader = new BackgroundLoader(this, this.configuration, this.resourceResolverFactory);
@@ -210,6 +216,8 @@ public class JobManagerImpl
@Deactivate
protected void deactivate() {
logger.info("Apache Sling Job Manager stopping on instance {}", Environment.APPLICATION_ID);
+ this.jobScheduler.deactivate();
+
this.backgroundLoader.deactivate();
this.backgroundLoader = null;
@@ -459,9 +467,14 @@ public class JobManagerImpl
}
this.backgroundLoader.loadJob(path);
}
+ this.jobScheduler.handleEvent(event);
} else if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
|| ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
this.backgroundLoader.tryToReloadUnloadedJobs();
+ this.jobScheduler.handleEvent(event);
+ } else if ( SlingConstants.TOPIC_RESOURCE_CHANGED.equals(event.getTopic())
+ || SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ) {
+ this.jobScheduler.handleEvent(event);
} else {
if ( EventUtil.isLocal(event) ) {
// job notifications
@@ -602,6 +615,7 @@ public class JobManagerImpl
this.startProcessing(event.getNewView());
}
+ this.jobScheduler.handleTopologyEvent(event);
}
/**
@@ -738,10 +752,15 @@ public class JobManagerImpl
*/
@Override
public boolean removeJob(final String jobId) {
- return this.internalRemoveJobJobById(jobId, false);
+ return this.internalRemoveJobById(jobId, false);
}
- private boolean internalRemoveJobJobById(final String jobId, final boolean forceRemove) {
+ /**
+ * Remove a job.
+ * If the job is already in the storage area, it's removed forever.
+ * Otherwise it's moved to the storage area.
+ */
+ private boolean internalRemoveJobById(final String jobId, final boolean forceRemove) {
logger.debug("Trying to remove job {}", jobId);
boolean result = true;
final JobImpl job = (JobImpl)this.getJobById(jobId);
@@ -792,7 +811,7 @@ public class JobManagerImpl
*/
@Override
public void forceRemoveJob(final String jobId) {
- this.internalRemoveJobJobById(jobId, true);
+ this.internalRemoveJobById(jobId, true);
}
/**
@@ -808,20 +827,11 @@ public class JobManagerImpl
*/
@Override
public Job addJob(final String topic, final String name, final Map<String, Object> properties) {
- final String errorMessage = Utility.checkJobTopic(topic);
+ final String errorMessage = Utility.checkJob(topic, properties);
if ( errorMessage != null ) {
logger.warn("{}", errorMessage);
return null;
}
- if ( properties != null ) {
- for(final Object val : properties.values()) {
- if ( val != null && !(val instanceof Serializable) ) {
- logger.warn("Discarding job - properties must be serializable: {} {} : {}",
- new Object[] {topic, name, properties});
- return null;
- }
- }
- }
Job result = this.addJobInteral(topic, name, properties);
if ( result == null && name != null ) {
result = this.getJobByName(name);
@@ -939,7 +949,7 @@ public class JobManagerImpl
*/
@Override
public boolean removeJobById(final String jobId) {
- return this.internalRemoveJobJobById(jobId, true);
+ return this.internalRemoveJobById(jobId, true);
}
/**
@@ -1428,7 +1438,7 @@ public class JobManagerImpl
*/
@Override
public JobBuilder createJob(final String topic) {
- return new JobBuilderImpl(this, topic);
+ return new JobBuilderImpl(this, this.logger, topic);
}
@Override
@@ -1442,4 +1452,18 @@ public class JobManagerImpl
// TODO Auto-generated method stub
return null;
}
+
+ public boolean addScheduledJob(final String topic,
+ final String jobName,
+ final Map<String, Object> properties,
+ final String scheduleName,
+ final ScheduleInfo scheduleInfo) {
+ try {
+ return this.jobScheduler.writeJob(topic, jobName, properties, scheduleName, scheduleInfo);
+ } catch ( final PersistenceException pe) {
+ logger.warn("Unable to persist scheduled job", pe);
+ }
+ return false;
+ }
+
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1529825&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Mon Oct 7 12:07:10 2013
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.QuerySyntaxException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfo;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobBuilder;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A scheduler for scheduling jobs
+ *
+ * TODO check handling of running and active flag
+ */
+public class JobSchedulerImpl
+ implements EventHandler, TopologyEventListener, org.apache.sling.commons.scheduler.Job {
+
+ /** We use the same resource type as for timed events. */
+ private static final String SCHEDULED_JOB_RESOURCE_TYPE = "slingevent:TimedEvent";
+
+ private static final String TOPIC_READ_JOB = "org/apache/sling/event/impl/jobs/READSCHEDULEDJOB";
+
+ private static final String PROPERTY_READ_JOB = "properties";
+
+ /** Default logger */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Is the background task still running? */
+ private volatile boolean running;
+
+ /** Is this active? */
+ private volatile boolean active;
+
+ private final ResourceResolverFactory resourceResolverFactory;
+
+ private final JobManagerConfiguration config;
+
+ private final Scheduler scheduler;
+
+ private final JobManagerImpl jobManager;
+
+ /** A local queue for serializing the event processing. */
+ private final BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();
+
+ /** Unloaded events. */
+ private final Set<String>unloadedEvents = new HashSet<String>();
+
+ private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<String, ScheduledJobInfoImpl>();
+
+ public JobSchedulerImpl(final JobManagerConfiguration configuration,
+ final ResourceResolverFactory resourceResolverFactory,
+ final Scheduler scheduler,
+ final JobManagerImpl jobManager) {
+ this.config = configuration;
+ this.resourceResolverFactory = resourceResolverFactory;
+ this.scheduler = scheduler;
+ this.running = true;
+ this.jobManager = jobManager;
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ public void deactivate() {
+ this.running = false;
+ this.stopScheduling();
+ }
+
+ private void stopScheduling() {
+ if ( this.active ) {
+ final List<ScheduledJobInfoImpl> jobs = new ArrayList<ScheduledJobInfoImpl>();
+ synchronized ( this.scheduledJobs ) {
+ for(final ScheduledJobInfoImpl job : this.scheduledJobs.values() ) {
+ jobs.add(job);
+ }
+ }
+ for(final ScheduledJobInfoImpl info : jobs) {
+ try {
+ logger.debug("Stopping scheduled job : {}", info.getName());
+ this.scheduler.removeJob(info.getSchedulerJobId());
+ } catch ( final NoSuchElementException nsee ) {
+ this.ignoreException(nsee);
+ }
+ }
+ }
+ synchronized ( this.scheduledJobs ) {
+ this.scheduledJobs.clear();
+ }
+
+ // stop background threads by putting empty objects into the queue
+ this.queue.clear();
+ try {
+ this.queue.put(new Event(Utility.TOPIC_STOPPED, (Dictionary<String, Object>)null));
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+
+ private void startScheduling() {
+ final long now = System.currentTimeMillis();
+ final Thread backgroundThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ loadScheduledJobs(now);
+ try {
+ runInBackground();
+ } catch (final Throwable t) { //NOSONAR
+ logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ }
+ });
+ backgroundThread.start();
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+ */
+ protected void runInBackground() {
+ Event event = null;
+ while ( this.running ) {
+ // so let's wait/get the next event from the queue
+ if ( event == null ) {
+ try {
+ event = this.queue.take();
+ } catch (final InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ }
+ if ( event != null && this.running ) {
+ Event nextEvent = null;
+
+ // check event type
+ if ( event.getTopic().equals(TOPIC_READ_JOB) ) {
+ @SuppressWarnings("unchecked")
+ final Map<String, Object> properties = (Map<String, Object>) event.getProperty(PROPERTY_READ_JOB);
+ properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+ properties.remove(Job.PROPERTY_JOB_CREATED);
+ properties.remove(Job.PROPERTY_JOB_CREATED_INSTANCE);
+
+ final String jobTopic = (String) properties.remove(JobUtil.PROPERTY_JOB_TOPIC);
+ final String jobName = (String) properties.remove(JobUtil.PROPERTY_JOB_NAME);
+ final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULER_NAME);
+ final ScheduleInfo scheduleInfo = (ScheduleInfo) properties.remove(ResourceHelper.PROPERTY_SCHEDULER_INFO);
+
+ // and now schedule (TODO)
+ final ScheduledJobInfoImpl info = new ScheduledJobInfoImpl(this, jobTopic, jobName, properties, schedulerName, scheduleInfo);
+ synchronized ( this.scheduledJobs ) {
+ this.scheduledJobs.put(ResourceHelper.filterName(schedulerName), info);
+ }
+ if ( this.active ) {
+ this.startScheduledJob(info);
+ }
+ }
+ if ( event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_ADDED)
+ || event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_CHANGED)) {
+ final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource eventResource = resolver.getResource(path);
+ if ( ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(eventResource.getResourceType()) ) {
+ final ReadResult result = this.readScheduledJob(eventResource);
+ if ( result != null ) {
+ if ( result.hasReadErrors ) {
+ synchronized ( this.unloadedEvents ) {
+ this.unloadedEvents.add(eventResource.getPath());
+ }
+ } else {
+ nextEvent = result.event;
+ }
+ }
+ }
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ } else if ( event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_REMOVED) ) {
+ final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+ final String scheduleName = ResourceUtil.getName(path);
+ final ScheduledJobInfoImpl info;
+ synchronized ( this.scheduledJobs ) {
+ info = this.scheduledJobs.remove(scheduleName);
+ }
+ if ( info != null && this.active ) {
+ logger.debug("Stopping scheduled job : {}", info.getName());
+ try {
+ this.scheduler.removeJob(info.getSchedulerJobId());
+ } catch (final NoSuchElementException nsee) {
+ // this can happen if the job is scheduled on another node
+ // so we can just ignore this
+ }
+
+ }
+ }
+ event = nextEvent;
+ }
+ }
+ }
+
+ private void startScheduledJob(final ScheduledJobInfoImpl info) {
+ // Create configuration for scheduled job
+ final Map<String, Serializable> config = new HashMap<String, Serializable>();
+ config.put(PROPERTY_READ_JOB, info);
+
+ logger.debug("Adding scheduled job: {}", info.getName());
+ try {
+ switch ( info.getScheduleType() ) {
+ case DAILY:
+ // TODO
+ break;
+ case DATE:
+ this.scheduler.fireJobAt(info.getSchedulerJobId(), this, config, info.getNextScheduledExecution());
+ break;
+ case PERIODICALLY:
+ this.scheduler.addPeriodicJob(info.getSchedulerJobId(), this, config, info.getPeriod() * 1000, false);
+ break;
+ case WEEKLY:
+ // TODO
+ break;
+ }
+ } catch (final Exception e) {
+ // we ignore it if scheduled fails...
+ this.ignoreException(e);
+ }
+ }
+
+ /**
+ * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+ */
+ @Override
+ public void execute(final JobContext context) {
+ final ScheduledJobInfoImpl info = (ScheduledJobInfoImpl) context.getConfiguration().get(PROPERTY_READ_JOB);
+
+ this.jobManager.addJob(info.getJobTopic(), info.getJobName(), info.getJobProperties());
+
+ // is this job scheduled for a specific date?
+ if ( info.getScheduleType() == ScheduledJobInfo.ScheduleType.DATE ) {
+ // we can remove it from the resource tree
+ this.unschedule(info);
+ }
+ }
+
+ public void unschedule(final ScheduledJobInfoImpl info) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final StringBuilder sb = new StringBuilder(this.config.getScheduledJobsPathWithSlash());
+ sb.append('/');
+ sb.append(ResourceHelper.filterName(info.getName()));
+ final String path = sb.toString();
+
+ final Resource eventResource = resolver.getResource(path);
+ if ( eventResource != null ) {
+ resolver.delete(eventResource);
+ resolver.commit();
+ }
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } catch (final PersistenceException pe) {
+ // we ignore the exception if removing fails
+ ignoreException(pe);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ @Override
+ public void handleEvent(final Event event) {
+ if ( this.running ) {
+ if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
+ || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
+ // bundle event started or updated
+ boolean doIt = false;
+ synchronized ( this.unloadedEvents ) {
+ if ( this.unloadedEvents.size() > 0 ) {
+ doIt = true;
+ }
+ }
+ if ( doIt ) {
+ final Runnable t = new Runnable() {
+
+ @Override
+ public void run() {
+ synchronized (unloadedEvents) {
+ ResourceResolver resolver = null;
+ final Set<String> newUnloadedEvents = new HashSet<String>();
+ newUnloadedEvents.addAll(unloadedEvents);
+ try {
+ resolver = resourceResolverFactory.getAdministrativeResourceResolver(null);
+ for(final String path : unloadedEvents ) {
+ newUnloadedEvents.remove(path);
+ final Resource eventResource = resolver.getResource(path);
+ final ReadResult result = readScheduledJob(eventResource);
+ if ( result != null ) {
+ if ( result.hasReadErrors ) {
+ newUnloadedEvents.add(path);
+ } else {
+ try {
+ queue.put(result.event);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should never occur
+ ignoreException(e);
+ }
+ }
+ }
+ }
+ } catch (final LoginException re) {
+ // unable to create resource resolver so we try it again next time
+ ignoreException(re);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ unloadedEvents.clear();
+ unloadedEvents.addAll(newUnloadedEvents);
+ }
+ }
+ }
+
+ };
+ Environment.THREAD_POOL.execute(t);
+ }
+ } else {
+ final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+ final String resourceType = (String)event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
+ if ( path != null && path.startsWith(this.config.getScheduledJobsPathWithSlash())
+ && (resourceType == null || ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(resourceType))) {
+ logger.debug("Received resource event for {} : {}", path, resourceType);
+ try {
+ this.queue.put(event);
+ } catch (final InterruptedException ignore) {
+ this.ignoreException(ignore);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Load all scheduled jobs from the resource tree
+ */
+ private void loadScheduledJobs(final long startTime) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Calendar startDate = Calendar.getInstance();
+ startDate.setTimeInMillis(startTime);
+
+ final StringBuilder buf = new StringBuilder(64);
+
+ buf.append("//element(*,");
+ buf.append(SCHEDULED_JOB_RESOURCE_TYPE);
+ buf.append(")[@");
+ buf.append(ISO9075.encode(org.apache.sling.event.jobs.Job.PROPERTY_JOB_CREATED));
+ buf.append(" < xs:dateTime('");
+ buf.append(ISO8601.format(startDate));
+ buf.append("')] order by @");
+ buf.append(ISO9075.encode(org.apache.sling.event.jobs.Job.PROPERTY_JOB_CREATED));
+ buf.append(" ascending");
+ final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+ while ( result.hasNext() ) {
+ final Resource eventResource = result.next();
+ // sanity check for the path
+ if ( eventResource.getPath().startsWith(this.config.getScheduledJobsPathWithSlash()) ) {
+ final ReadResult readResult = this.readScheduledJob(eventResource);
+ if ( readResult != null ) {
+ if ( readResult.hasReadErrors ) {
+ synchronized ( this.unloadedEvents ) {
+ this.unloadedEvents.add(eventResource.getPath());
+ }
+ } else {
+ try {
+ this.queue.put(readResult.event);
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+
+ } catch (final QuerySyntaxException qse) {
+ this.ignoreException(qse);
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+
+ private static final class ReadResult {
+ public Event event;
+ public boolean hasReadErrors;
+ }
+
+ /**
+ * Read a scheduled job from the resource
+ * @return The job or <code>null</code>
+ */
+ private ReadResult readScheduledJob(final Resource eventResource) {
+ if ( eventResource != null ) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(eventResource);
+ final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+ final ReadResult result = new ReadResult();
+ @SuppressWarnings("unchecked")
+ final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
+ result.hasReadErrors = readErrorList != null;
+ if ( readErrorList != null ) {
+ for(final Exception e : readErrorList) {
+ logger.warn("Unable to read scheduled job from " + eventResource.getPath(), e);
+ }
+ }
+ final Map<String, Object> eventProps = Collections.singletonMap(PROPERTY_READ_JOB, (Object)properties);
+ result.event = new Event(TOPIC_READ_JOB, eventProps);
+
+ return result;
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.ignoreException(ie);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Write a schedule job to the resource tree.
+ * @throws PersistenceException
+ */
+ public boolean writeJob(
+ final String jobTopic,
+ final String jobName,
+ final Map<String, Object> jobProperties,
+ final String schedulerName,
+ final ScheduleInfo scheduleInfo)
+ throws PersistenceException {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ // create properties
+ final Map<String, Object> properties = new HashMap<String, Object>();
+
+ if ( jobProperties != null ) {
+ for(final Map.Entry<String, Object> entry : jobProperties.entrySet() ) {
+ final String propName = entry.getKey();
+ if ( !ResourceHelper.ignoreProperty(propName) ) {
+ properties.put(propName, entry.getValue());
+ }
+ }
+ }
+
+ properties.put(JobUtil.PROPERTY_JOB_TOPIC, jobTopic);
+ if ( jobName != null ) {
+ properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
+ }
+ properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
+ properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
+
+ // put scheduler name and scheduler info
+ properties.put(ResourceHelper.PROPERTY_SCHEDULER_NAME, schedulerName);
+ properties.put(ResourceHelper.PROPERTY_SCHEDULER_INFO, scheduleInfo);
+
+ // create path and resource
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
+
+ final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(schedulerName);
+
+ // update existing resource
+ final Resource existingInfo = resolver.getResource(path);
+ if ( existingInfo != null ) {
+ resolver.delete(existingInfo);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Updating scheduled job {} at {}", properties, path);
+ }
+ } else {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Storing new scheduled job {} at {}", properties, path);
+ }
+ }
+ ResourceHelper.getOrCreateResource(resolver,
+ path,
+ properties);
+ return true;
+ } catch ( final LoginException le ) {
+ // we ignore this
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+ */
+ @Override
+ public void handleTopologyEvent(final TopologyEvent event) {
+ if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+ this.active = false;
+ this.stopScheduling();
+ } else if ( event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.TOPOLOGY_INIT ) {
+ final boolean previouslyActive = this.active;
+ this.active = event.getNewView().getLocalInstance().isLeader();
+ if ( this.active && !previouslyActive ) {
+ this.startScheduling();
+ }
+ if ( !this.active && previouslyActive ) {
+ this.stopScheduling();
+ }
+ }
+ }
+
+ public JobBuilder.ScheduleBuilder createJobBuilder(final ScheduledJobInfoImpl info) {
+ final JobBuilder builder = this.jobManager.createJob(info.getJobTopic()).name(info.getJobTopic()).properties(info.getJobProperties());
+ return builder.schedule(info.getName());
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java?rev=1529825&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java Mon Oct 7 12:07:10 2013
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.sling.event.impl.support.ScheduleInfo;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobBuilder.ScheduleBuilder;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+
+public class ScheduledJobInfoImpl implements ScheduledJobInfo, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String scheduleName;
+
+ private final String jobName;
+
+ private final String jobTopic;
+
+ private final Map<String, Object> jobProperties;
+
+ private final ScheduleInfo scheduleInfo;
+
+ private final JobSchedulerImpl jobScheduler;
+
+ public ScheduledJobInfoImpl(final JobSchedulerImpl jobScheduler,
+ final String jobTopic,
+ final String jobName,
+ final Map<String, Object> jobProperties,
+ final String scheduleName,
+ final ScheduleInfo scheduleInfo) {
+ this.jobScheduler = jobScheduler;
+ this.scheduleName = scheduleName;
+ this.jobName = jobName;
+ this.jobTopic = jobTopic;
+ this.jobProperties = jobProperties;
+ this.scheduleInfo = scheduleInfo;
+ }
+
+ @Override
+ public String getName() {
+ return this.scheduleName;
+ }
+
+ @Override
+ public ScheduleType getScheduleType() {
+ return this.scheduleInfo.getScheduleType();
+ }
+
+ @Override
+ public Date getNextScheduledExecution() {
+ if ( this.scheduleInfo.getScheduleType() == ScheduleType.DATE ) {
+ return this.scheduleInfo.getAt();
+ }
+ return null;
+ }
+
+ @Override
+ public int getDayOfWeek() {
+ return this.scheduleInfo.getDayOfWeek();
+ }
+
+ @Override
+ public int getHourOfDay() {
+ return this.scheduleInfo.getHourOfDay();
+ }
+
+ @Override
+ public int getMinuteOfHour() {
+ return this.scheduleInfo.getPeriod();
+ }
+
+ @Override
+ public int getPeriod() {
+ return this.scheduleInfo.getPeriod();
+ }
+
+ @Override
+ public String getJobTopic() {
+ return this.jobTopic;
+ }
+
+ @Override
+ public String getJobName() {
+ return this.jobName;
+ }
+
+ @Override
+ public Map<String, Object> getJobProperties() {
+ return this.jobProperties;
+ }
+
+ @Override
+ public void unschedule() {
+ this.jobScheduler.unschedule(this);
+ }
+
+ @Override
+ public ScheduleBuilder reschedule() {
+ return this.jobScheduler.createJobBuilder(this);
+ }
+
+ /**
+ * Get the scheduler job id
+ */
+ public String getSchedulerJobId() {
+ return Job.class.getName() + ":" + this.scheduleName;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Oct 7 12:07:10 2013
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.impl.jobs;
+import java.io.Serializable;
import java.util.Calendar;
import java.util.Dictionary;
import java.util.HashMap;
@@ -68,6 +69,25 @@ public abstract class Utility {
return message;
}
+ /**
+ * Check the job.
+ * @return <code>null</code> if the topic topic is correct and all properties are serializable,
+ * otherwise an error description is returned
+ */
+ public static String checkJob(final Object jobTopic, final Map<String, Object> properties) {
+ final String msg = checkJobTopic(jobTopic);
+ if ( msg == null ) {
+ if ( properties != null ) {
+ for(final Object val : properties.values()) {
+ if ( val != null && !(val instanceof Serializable) ) {
+ return "Discarding job - properties must be serializable: " + jobTopic + " : " + properties;
+ }
+ }
+ }
+ }
+ return msg;
+ }
+
/** Event property containing the time for job start and job finished events. */
public static final String PROPERTY_TIME = "time";
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Mon Oct 7 12:07:10 2013
@@ -49,10 +49,16 @@ public abstract class ResourceHelper {
public static final String RESOURCE_TYPE_EVENT = "slingevent:Event";
+ /** We use the same resource type as for timed events. */
+ public static final String RESOURCE_TYPE_SCHEDULED_JOB = "slingevent:TimedEvent";
+
public static final String BUNDLE_EVENT_UPDATED = "org/osgi/framework/BundleEvent/UPDATED";
public static final String BUNDLE_EVENT_STARTED = "org/osgi/framework/BundleEvent/STARTED";
+ public static final String PROPERTY_SCHEDULER_NAME = "slingevent:schedulerName";
+ public static final String PROPERTY_SCHEDULER_INFO = "slingevent:schedulerInfo";
+
/** List of ignored properties to write to the repository. */
@SuppressWarnings("deprecation")
private static final String[] IGNORE_PROPERTIES = new String[] {
@@ -73,7 +79,9 @@ public abstract class ResourceHelper {
Job.PROPERTY_JOB_PROGRESS_STEPS,
Job.PROPERTY_FINISHED_DATE,
JobImpl.PROPERTY_FINISHED_STATE,
- Job.PROPERTY_RESULT_MESSAGE
+ Job.PROPERTY_RESULT_MESSAGE,
+ PROPERTY_SCHEDULER_INFO,
+ PROPERTY_SCHEDULER_NAME
};
/**
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java?rev=1529825&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java Mon Oct 7 12:07:10 2013
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.support;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import org.apache.sling.event.jobs.ScheduledJobInfo.ScheduleType;
+
+// TODO - implement serializing
+public class ScheduleInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static ScheduleInfo PERIODIC(final int minutes) {
+ return new ScheduleInfo(ScheduleType.PERIODICALLY, minutes, -1, -1, -1, null);
+ }
+
+ public static ScheduleInfo AT(final Date at) {
+ return new ScheduleInfo(ScheduleType.DATE, -1, -1, -1, -1, at);
+ }
+
+ public static ScheduleInfo WEEKLY(final int day, final int hour, final int minute) {
+ return new ScheduleInfo(ScheduleType.WEEKLY, -1, day, hour, minute, null);
+ }
+
+ public static ScheduleInfo DAYLY(final int hour, final int minute) {
+ return new ScheduleInfo(ScheduleType.DAILY, -1, -1, hour, minute, null);
+ }
+
+ private final ScheduleType scheduleType;
+
+ private final int period;
+
+ private final int dayOfWeek;
+
+ private final int hourOfDay;
+
+ private final int minuteOfHour;
+
+ private final Date at;
+
+ private ScheduleInfo(final ScheduleType scheduleType,
+ final int period,
+ final int dayOfWeek,
+ final int hourOfDay,
+ final int minuteOfHour,
+ final Date at) {
+ this.scheduleType = scheduleType;
+ this.period = period;
+ this.dayOfWeek = dayOfWeek;
+ this.hourOfDay = hourOfDay;
+ this.minuteOfHour = minuteOfHour;
+ this.at = at;
+ }
+
+ public Date getAt() {
+ return this.at;
+ }
+
+ public ScheduleType getScheduleType() {
+ return this.scheduleType;
+ }
+
+ public int getDayOfWeek() {
+ return this.dayOfWeek;
+ }
+
+ public int getHourOfDay() {
+ return this.hourOfDay;
+ }
+
+ public int getMinuteOfHour() {
+ return this.minuteOfHour;
+ }
+
+ public int getPeriod() {
+ return this.period;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java Mon Oct 7 12:07:10 2013
@@ -53,6 +53,7 @@ public interface JobBuilder {
* Schedule the job
* If a job scheduler with the same name already exists, it is updated
* with the new information.
+ * If no name is provided (empty name or null), the job can't be scheduled.
* @param name Unique name for the scheduler.
* @return A schedule builder to schedule the jobs
*/
@@ -64,7 +65,8 @@ public interface JobBuilder {
public interface ScheduleBuilder {
/**
- * Schedule the job periodically every N minutes
+ * Schedule the job periodically every N minutes.
+ * If the minutes argument is less than 1, the job can't be scheduled.
* @param minutes Positive number of minutes
* @return <code>true</code> if the job could be scheduled, <code>false</code>otherwise.
*/
@@ -77,12 +79,14 @@ public interface JobBuilder {
/**
* Schedule the job weekly, the time needs to be specified in addition.
- * @param day Day of the week, Sunday being one, Monday two, up to Saturday being seven.
+ * If a value lower than 1 or higher than 7 is used, the job can't be scheduled.
+ * @param day Day of the week, 1:Sunday, 2:Monday, ... 7:Saturday.
*/
TimeBuilder weekly(final int day);
/**
* Schedule the job for a specific date.
+ * If no date or a a date in the past is provided, the job can't be scheduled.
* @param date The date
* @return <code>true</code> if the job could be scheduled, <code>false</code>otherwise.
*/
@@ -93,6 +97,8 @@ public interface JobBuilder {
/**
* Schedule the job for the given hour and minute.
+ * If a value less than zero for hour or minute is specified or a value higher than 23 for hour or
+ * a value higher than 59 for minute than the job can't be scheduled.
* @param hour Hour of the day ranging from 0 to 23.
* @param minute Minute of the hour ranging from 0 to 59.
* @return <code>true</code> if the job could be scheduled, <code>false</code>otherwise.
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java Mon Oct 7 12:07:10 2013
@@ -23,35 +23,87 @@ import java.util.Map;
import aQute.bnd.annotation.ProviderType;
+/**
+ * Information about a scheduled job
+ * @since 1.3
+ */
@ProviderType
public interface ScheduledJobInfo {
enum ScheduleType {
- DATE,
- PERIODICALLY,
- DAILY,
- WEEKLY
+ DATE, // scheduled for a date
+ PERIODICALLY, // scheduled periodically (minutes)
+ DAILY, // scheduled once a day
+ WEEKLY // scheduled once a week
}
+ /**
+ * Return the unique scheduling name.
+ * @return The unique name
+ */
String getName();
+ /**
+ * Return the scheduling type
+ * @return The scheduling type
+ */
ScheduleType getScheduleType();
+ /**
+ * Return the next scheduled execution date.
+ */
Date getNextScheduledExecution();
+ /**
+ * If the job is scheduled weekly, returns the day of the week
+ * @return The day of the week (from 1 to 7) or -1
+ */
int getDayOfWeek();
+ /**
+ * Return the hour of the day for daily and weekly scheduled jobs
+ * @return The hour of the day (from 0 to 23) or -1
+ */
int getHourOfDay();
+ /**
+ * Return the minute of the hour for daily and weekly scheduled jobs.
+ * @return The minute of the hour (from 0 to 59) or -1
+ */
int getMinuteOfHour();
+ /**
+ * For periodically scheduled jobs, return the period in minutes.
+ * @return The period in minutes or -1
+ */
int getPeriod();
+ /**
+ * Return the job topic.
+ * @return The job topic
+ */
String getJobTopic();
+ /**
+ * Return the optional job name.
+ * @return The job name or <code>null</code>
+ */
String getJobName();
+ /**
+ * Return the optional job topics.
+ * @return The job topics or <code>null</code>
+ */
Map<String, Object> getJobProperties();
+ /**
+ * Unschedule this scheduled job.
+ */
void unschedule();
+
+ /**
+ * Reschedule this job with a new rescheduling information.
+ * If rescheduling fails, the job will be unscheduled.
+ */
+ JobBuilder.ScheduleBuilder reschedule();
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java Mon Oct 7 12:07:10 2013
@@ -167,7 +167,7 @@ public abstract class AbstractJobHandlin
// set load delay to 3 sec
final org.osgi.service.cm.Configuration c2 = this.configAdmin.getConfiguration("org.apache.sling.event.impl.jobs.jcr.PersistenceHandler", null);
Dictionary<String, Object> p2 = new Hashtable<String, Object>();
- p2.put(JobManagerConfiguration.CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY, 3L);
+ p2.put(JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY, 3L);
c2.update(p2);
final StartupHandler handler = new StartupHandler() {