You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/07 14:07:11 UTC

svn commit: r1529825 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/ main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/support/ main/java/org/apache/sling/event/jobs/ test/java/org/apach...

Author: cziegeler
Date: Mon Oct  7 12:07:10 2013
New Revision: 1529825

URL: http://svn.apache.org/r1529825
Log:
SLING-3139 : Provide a way to schedule jobs
SLING-3138 : Add fluent api to create new jobs

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Mon Oct  7 12:07:10 2013
@@ -156,22 +156,46 @@ public abstract class EventUtil {
      * Timed Events
      */
 
-    /** The topic for timed events. */
+    /**
+     * The topic for timed events.
+     * @deprecated Use scheduled jobs instead
+     */
+    @Deprecated
     public static final String TOPIC_TIMED_EVENT = "org/apache/sling/event/timed";
 
-    /** The real topic of the event. */
+    /**
+     * The real topic of the event.
+     * @deprecated Use scheduled jobs instead
+     */
+    @Deprecated
     public static final String PROPERTY_TIMED_EVENT_TOPIC = "event.topic.timed";
 
-    /** The property for the unique event id. */
+    /**
+     * The property for the unique event id.
+     * @deprecated Use scheduled jobs instead
+     */
+    @Deprecated
     public static final String PROPERTY_TIMED_EVENT_ID = "event.timed.id";
 
-    /** The scheduler cron expression for the timed event. Type must be String. */
+    /**
+     * The scheduler cron expression for the timed event. Type must be String.
+     * @deprecated Use scheduled jobs instead
+     */
+    @Deprecated
     public static final String PROPERTY_TIMED_EVENT_SCHEDULE = "event.timed.scheduler";
 
-    /** The period in seconds for the timed event. Type must be Long*/
+    /**
+     * The period in seconds for the timed event. Type must be Long.
+     * @deprecated Use scheduled jobs instead
+     */
+    @Deprecated
     public static final String PROPERTY_TIMED_EVENT_PERIOD = "event.timed.period";
 
-    /** The date for the timed event. Type must be Date.  */
+    /**
+     * The date for the timed event. Type must be Date.
+     * @deprecated Use scheduled jobs instead
+     */
+    @Deprecated
     public static final String PROPERTY_TIMED_EVENT_DATE = "event.timed.date";
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java Mon Oct  7 12:07:10 2013
@@ -25,7 +25,9 @@ import org.osgi.service.event.Event;
 
 /**
  * This service provides the current timed events status.
+ * @deprecated Use scheduled jobs instead
  */
+@Deprecated
 public interface TimedEventStatusProvider {
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java Mon Oct  7 12:07:10 2013
@@ -21,9 +21,11 @@ package org.apache.sling.event.impl.jobs
 import java.util.Date;
 import java.util.Map;
 
+import org.apache.sling.event.impl.support.ScheduleInfo;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobBuilder;
-import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+import org.slf4j.Logger;
 
 /**
  * Fluent builder API
@@ -32,15 +34,18 @@ public class JobBuilderImpl implements J
 
     private final String topic;
 
-    private final JobManager jobManager;
+    private final JobManagerImpl jobManager;
+
+    private final Logger logger;
 
     private String name;
 
     private Map<String, Object> properties;
 
-    public JobBuilderImpl(final JobManager manager, final String topic) {
+    public JobBuilderImpl(final JobManagerImpl manager, final Logger logger, final String topic) {
         this.jobManager = manager;
         this.topic = topic;
+        this.logger = logger;
     }
 
     @Override
@@ -62,43 +67,100 @@ public class JobBuilderImpl implements J
 
     @Override
     public ScheduleBuilder schedule(final String name) {
-        return null;
+        return new ScheduleBuilderImpl(name);
     }
 
     public final class ScheduleBuilderImpl implements ScheduleBuilder {
 
+        private final String scheduleName;
+
+        public ScheduleBuilderImpl(final String name) {
+            this.scheduleName = name;
+        }
+
+        private boolean check() {
+            if ( this.scheduleName == null || this.scheduleName.length() == 0 ) {
+                logger.warn("Discarding scheduled job - schedule name not specified");
+                return false;
+            }
+            final String errorMessage = Utility.checkJob(topic, properties);
+            if ( errorMessage != null ) {
+                logger.warn("{}", errorMessage);
+                return false;
+            }
+            return true;
+        }
+
         @Override
-        public boolean periodically(int minutes) {
-            // TODO Auto-generated method stub
+        public boolean periodically(final int minutes) {
+            if ( check() ) {
+                if ( minutes > 0 ) {
+                    final ScheduleInfo info = ScheduleInfo.PERIODIC(minutes);
+                    return jobManager.addScheduledJob(topic, name, properties, scheduleName, info);
+                }
+                logger.warn("Discarding scheduled job - period must be higher than 0 : {}", minutes);
+            }
             return false;
         }
 
         @Override
         public TimeBuilder daily() {
-            // TODO Auto-generated method stub
-            return null;
+            return new TimeBuilderImpl(ScheduledJobInfo.ScheduleType.DAILY, -1);
         }
 
         @Override
-        public TimeBuilder weekly(int day) {
-            // TODO Auto-generated method stub
-            return null;
+        public TimeBuilder weekly(final int day) {
+            return new TimeBuilderImpl(ScheduledJobInfo.ScheduleType.WEEKLY, day);
         }
 
         @Override
-        public boolean at(Date date) {
-            // TODO Auto-generated method stub
+        public boolean at(final Date date) {
+            if ( check() ) {
+                if ( date != null && date.getTime() > System.currentTimeMillis() ) {
+                    final ScheduleInfo info = ScheduleInfo.AT(date);
+                    return jobManager.addScheduledJob(topic, name, properties, scheduleName, info);
+                }
+                logger.warn("Discarding scheduled job - date must be in the future : {}", date);
+            }
             return false;
         }
 
         public final class TimeBuilderImpl implements TimeBuilder {
 
+            private final ScheduledJobInfo.ScheduleType scheduleType;
+
+            private final int day;
+
+            public TimeBuilderImpl(ScheduledJobInfo.ScheduleType scheduleType, final int day) {
+                this.scheduleType = scheduleType;
+                this.day = day;
+            }
+
             @Override
-            public boolean at(int hour, int minute) {
-                // TODO Auto-generated method stub
+            public boolean at(final int hour, final int minute) {
+                if ( check() ) {
+                    boolean valid = true;
+                    if ( scheduleType == ScheduledJobInfo.ScheduleType.WEEKLY ) {
+                        if ( day < 1 || day > 7 ) {
+                            valid = false;
+                            logger.warn("Discarding scheduled job - day must be between 1 and 7 : {}", day);
+                        }
+                    }
+                    if ( valid ) {
+                        if ( hour >= 0 && hour < 24 && minute >= 0 && minute < 60 ) {
+                            final ScheduleInfo info;
+                            if ( scheduleType == ScheduledJobInfo.ScheduleType.WEEKLY ) {
+                                info = ScheduleInfo.WEEKLY(this.day, hour, minute);
+                            } else {
+                                info = ScheduleInfo.DAYLY(hour, minute);
+                            }
+                            return jobManager.addScheduledJob(topic, name, properties, scheduleName, info);
+                        }
+                        logger.warn("Discarding scheduled job - wrong time information : {}…{}", hour, minute);
+                    }
+                }
                 return false;
             }
-
         }
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java Mon Oct  7 12:07:10 2013
@@ -31,24 +31,30 @@ import org.apache.sling.event.impl.suppo
  */
 public class JobManagerConfiguration {
 
-    /** Default repository path. */
+    /** Default resource path for jobs. */
     public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/jobs";
 
-    /** The path where all jobs are stored. */
-    public static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
-
     /** Default background load delay. */
     public static final long DEFAULT_BACKGROUND_LOAD_DELAY = 30;
 
-    /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
-    public static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
-
     /** Default for disabling the distribution. */
     public static final boolean DEFAULT_DISABLE_DISTRIBUTION = false;
 
+    /** Default resource path for scheduled jobs. */
+    private static final String DEFAULT_SCHEDULED_JOBS_PATH = "/var/eventing/scheduled-jobs";
+
+    /** The path where all jobs are stored. */
+    public static final String PROPERTY_REPOSITORY_PATH = "repository.path";
+
+    /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
+    public static final String PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
+
     /** Configuration switch for distributing the jobs. */
     public static final String PROPERTY_DISABLE_DISTRIBUTION = "job.consumermanager.disableDistribution";
 
+    /** Configuration property for the scheduled jobs path. */
+    private static final String PROPERTY_SCHEDULED_JOBS_PATH = "job.scheduled.jobs.path";
+
     /** The jobs base path with a slash. */
     private String jobsBasePathWithSlash;
 
@@ -82,9 +88,15 @@ public class JobManagerConfiguration {
 
     private String storedSuccessfulJobsPath;
 
+    /** The resource path where scheduled jobs are stored. */
+    private String scheduledJobsPath;
+
+    /** The resource path where scheduled jobs are stored - ending with a slash. */
+    private String scheduledJobsPathWithSlash;
+
     public JobManagerConfiguration(final Map<String, Object> props) {
         this.update(props);
-        this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH),
+        this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(PROPERTY_REPOSITORY_PATH),
                             DEFAULT_REPOSITORY_PATH) + '/';
 
         // create initial resources
@@ -101,6 +113,10 @@ public class JobManagerConfiguration {
 
         this.storedCancelledJobsPath = this.jobsBasePathWithSlash + "cancelled";
         this.storedSuccessfulJobsPath = this.jobsBasePathWithSlash + "finished";
+
+        this.scheduledJobsPath = PropertiesUtil.toString(props.get(PROPERTY_SCHEDULED_JOBS_PATH),
+                DEFAULT_SCHEDULED_JOBS_PATH);
+        this.scheduledJobsPathWithSlash = this.scheduledJobsPath + "/";
     }
 
     /**
@@ -108,7 +124,7 @@ public class JobManagerConfiguration {
      */
     public void update(final Map<String, Object> props) {
         this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION), DEFAULT_DISABLE_DISTRIBUTION);
-        this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+        this.backgroundLoadDelay = PropertiesUtil.toLong(props.get(PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
     }
 
     /**
@@ -226,6 +242,12 @@ public class JobManagerConfiguration {
         return this.disabledDistribution;
     }
 
+    /**
+     * Get the storage path for finished jobs.
+     * @param finishedJob The finished job
+     * @param isSuccess Whether processing was successful or not
+     * @return The complete storage path
+     */
     public String getStoragePath(final JobImpl finishedJob, final boolean isSuccess) {
         final String topicName = (finishedJob.isBridgedEvent() ? JobImpl.PROPERTY_BRIDGED_EVENT : finishedJob.getTopic().replace('/', '.'));
         final StringBuilder sb = new StringBuilder();
@@ -243,7 +265,18 @@ public class JobManagerConfiguration {
 
     }
 
+    /**
+     * Check whether this is a storage path.
+     */
     public boolean isStoragePath(final String path) {
         return path.startsWith(this.storedCancelledJobsPath) || path.startsWith(this.storedSuccessfulJobsPath);
     }
+
+    public String getScheduledJobsPath() {
+        return this.scheduledJobsPath;
+    }
+
+    public String getScheduledJobsPathWithSlash() {
+        return this.scheduledJobsPathWithSlash;
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Oct  7 12:07:10 2013
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.event.impl.jobs;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
@@ -70,6 +69,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfo;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobBuilder;
 import org.apache.sling.event.jobs.JobManager;
@@ -99,12 +99,14 @@ import org.slf4j.LoggerFactory;
            name="org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
 @Service(value={JobManager.class, EventHandler.class, TopologyEventListener.class, Runnable.class})
 @Properties({
-    @Property(name=JobManagerConfiguration.CONFIG_PROPERTY_REPOSITORY_PATH,
+    @Property(name=JobManagerConfiguration.PROPERTY_REPOSITORY_PATH,
           value=JobManagerConfiguration.DEFAULT_REPOSITORY_PATH),
     @Property(name="scheduler.period", longValue=60),
     @Property(name="scheduler.concurrent", boolValue=false),
     @Property(name=EventConstants.EVENT_TOPIC,
               value={SlingConstants.TOPIC_RESOURCE_ADDED,
+                     SlingConstants.TOPIC_RESOURCE_CHANGED,
+                     SlingConstants.TOPIC_RESOURCE_REMOVED,
                      "org/apache/sling/event/notification/job/*",
                      ResourceHelper.BUNDLE_EVENT_STARTED,
                      ResourceHelper.BUNDLE_EVENT_UPDATED})
@@ -166,6 +168,9 @@ public class JobManagerImpl
     /** Set of paths directly added as jobs - these will be ignored during observation handling. */
     private final Set<String> directlyAddedPaths = new HashSet<String>();
 
+    /** Job Scheduler. */
+    private JobSchedulerImpl jobScheduler;
+
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -173,6 +178,7 @@ public class JobManagerImpl
     @Activate
     protected void activate(final Map<String, Object> props) throws LoginException {
         this.configuration = new JobManagerConfiguration(props);
+        this.jobScheduler = new JobSchedulerImpl(this.configuration, this.resourceResolverFactory, this.scheduler, this);
         this.maintenanceTask = new MaintenanceTask(this.configuration, this.resourceResolverFactory);
         this.backgroundLoader = new BackgroundLoader(this, this.configuration, this.resourceResolverFactory);
 
@@ -210,6 +216,8 @@ public class JobManagerImpl
     @Deactivate
     protected void deactivate() {
         logger.info("Apache Sling Job Manager stopping on instance {}", Environment.APPLICATION_ID);
+        this.jobScheduler.deactivate();
+
         this.backgroundLoader.deactivate();
         this.backgroundLoader = null;
 
@@ -459,9 +467,14 @@ public class JobManagerImpl
                 }
                 this.backgroundLoader.loadJob(path);
             }
+            this.jobScheduler.handleEvent(event);
         } else if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
                  || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
             this.backgroundLoader.tryToReloadUnloadedJobs();
+            this.jobScheduler.handleEvent(event);
+        } else if ( SlingConstants.TOPIC_RESOURCE_CHANGED.equals(event.getTopic())
+                 || SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ) {
+            this.jobScheduler.handleEvent(event);
         } else {
             if ( EventUtil.isLocal(event) ) {
                 // job notifications
@@ -602,6 +615,7 @@ public class JobManagerImpl
 
             this.startProcessing(event.getNewView());
         }
+        this.jobScheduler.handleTopologyEvent(event);
     }
 
     /**
@@ -738,10 +752,15 @@ public class JobManagerImpl
      */
     @Override
     public boolean removeJob(final String jobId) {
-        return this.internalRemoveJobJobById(jobId, false);
+        return this.internalRemoveJobById(jobId, false);
     }
 
-    private boolean internalRemoveJobJobById(final String jobId, final boolean forceRemove) {
+    /**
+     * Remove a job.
+     * If the job is already in the storage area, it's removed forever.
+     * Otherwise it's moved to the storage area.
+     */
+    private boolean internalRemoveJobById(final String jobId, final boolean forceRemove) {
         logger.debug("Trying to remove job {}", jobId);
         boolean result = true;
         final JobImpl job = (JobImpl)this.getJobById(jobId);
@@ -792,7 +811,7 @@ public class JobManagerImpl
      */
     @Override
     public void forceRemoveJob(final String jobId) {
-        this.internalRemoveJobJobById(jobId, true);
+        this.internalRemoveJobById(jobId, true);
     }
 
     /**
@@ -808,20 +827,11 @@ public class JobManagerImpl
      */
     @Override
     public Job addJob(final String topic, final String name, final Map<String, Object> properties) {
-        final String errorMessage = Utility.checkJobTopic(topic);
+        final String errorMessage = Utility.checkJob(topic, properties);
         if ( errorMessage != null ) {
             logger.warn("{}", errorMessage);
             return null;
         }
-        if ( properties != null ) {
-            for(final Object val : properties.values()) {
-                if ( val != null && !(val instanceof Serializable) ) {
-                    logger.warn("Discarding job - properties must be serializable: {} {} : {}",
-                            new Object[] {topic, name, properties});
-                    return null;
-                }
-            }
-        }
         Job result = this.addJobInteral(topic, name, properties);
         if ( result == null && name != null ) {
             result = this.getJobByName(name);
@@ -939,7 +949,7 @@ public class JobManagerImpl
      */
     @Override
     public boolean removeJobById(final String jobId) {
-        return this.internalRemoveJobJobById(jobId, true);
+        return this.internalRemoveJobById(jobId, true);
     }
 
     /**
@@ -1428,7 +1438,7 @@ public class JobManagerImpl
      */
     @Override
     public JobBuilder createJob(final String topic) {
-        return new JobBuilderImpl(this, topic);
+        return new JobBuilderImpl(this, this.logger, topic);
     }
 
     @Override
@@ -1442,4 +1452,18 @@ public class JobManagerImpl
         // TODO Auto-generated method stub
         return null;
     }
+
+    public boolean addScheduledJob(final String topic,
+            final String jobName,
+            final Map<String, Object> properties,
+            final String scheduleName,
+            final ScheduleInfo scheduleInfo) {
+        try {
+            return this.jobScheduler.writeJob(topic, jobName, properties, scheduleName, scheduleInfo);
+        } catch ( final PersistenceException pe) {
+            logger.warn("Unable to persist scheduled job", pe);
+        }
+        return false;
+    }
+
 }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1529825&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Mon Oct  7 12:07:10 2013
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.QuerySyntaxException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfo;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobBuilder;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A scheduler for scheduling jobs
+ *
+ * TODO check handling of running and active flag
+ */
+public class JobSchedulerImpl
+    implements EventHandler, TopologyEventListener, org.apache.sling.commons.scheduler.Job {
+
+    /** We use the same resource type as for timed events. */
+    private static final String SCHEDULED_JOB_RESOURCE_TYPE = "slingevent:TimedEvent";
+
+    private static final String TOPIC_READ_JOB = "org/apache/sling/event/impl/jobs/READSCHEDULEDJOB";
+
+    private static final String PROPERTY_READ_JOB = "properties";
+
+    /** Default logger */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Is the background task still running? */
+    private volatile boolean running;
+
+    /** Is this active? */
+    private volatile boolean active;
+
+    private final ResourceResolverFactory resourceResolverFactory;
+
+    private final JobManagerConfiguration config;
+
+    private final Scheduler scheduler;
+
+    private final JobManagerImpl jobManager;
+
+    /** A local queue for serializing the event processing. */
+    private final BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();
+
+    /** Unloaded events. */
+    private final Set<String>unloadedEvents = new HashSet<String>();
+
+    private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<String, ScheduledJobInfoImpl>();
+
+    public JobSchedulerImpl(final JobManagerConfiguration configuration,
+            final ResourceResolverFactory resourceResolverFactory,
+            final Scheduler scheduler,
+            final JobManagerImpl jobManager) {
+        this.config = configuration;
+        this.resourceResolverFactory = resourceResolverFactory;
+        this.scheduler = scheduler;
+        this.running = true;
+        this.jobManager = jobManager;
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    public void deactivate() {
+        this.running = false;
+        this.stopScheduling();
+    }
+
+    private void stopScheduling() {
+        if ( this.active ) {
+            final List<ScheduledJobInfoImpl> jobs = new ArrayList<ScheduledJobInfoImpl>();
+            synchronized ( this.scheduledJobs ) {
+                for(final ScheduledJobInfoImpl job : this.scheduledJobs.values() ) {
+                    jobs.add(job);
+                }
+            }
+            for(final ScheduledJobInfoImpl info : jobs) {
+                try {
+                    logger.debug("Stopping scheduled job : {}", info.getName());
+                    this.scheduler.removeJob(info.getSchedulerJobId());
+                } catch ( final NoSuchElementException nsee ) {
+                    this.ignoreException(nsee);
+                }
+            }
+        }
+        synchronized ( this.scheduledJobs ) {
+            this.scheduledJobs.clear();
+        }
+
+        // stop background threads by putting empty objects into the queue
+        this.queue.clear();
+        try {
+            this.queue.put(new Event(Utility.TOPIC_STOPPED, (Dictionary<String, Object>)null));
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+        }
+    }
+
+    private void startScheduling() {
+        final long now = System.currentTimeMillis();
+        final Thread backgroundThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                loadScheduledJobs(now);
+                try {
+                    runInBackground();
+                } catch (final Throwable t) { //NOSONAR
+                    logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+                    running = false;
+                }
+            }
+        });
+        backgroundThread.start();
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground()
+     */
+    protected void runInBackground() {
+        Event event = null;
+        while ( this.running ) {
+            // so let's wait/get the next event from the queue
+            if ( event == null ) {
+                try {
+                    event = this.queue.take();
+                } catch (final InterruptedException e) {
+                    // we ignore this
+                    this.ignoreException(e);
+                }
+            }
+            if ( event != null && this.running ) {
+                Event nextEvent = null;
+
+                // check event type
+                if ( event.getTopic().equals(TOPIC_READ_JOB) ) {
+                    @SuppressWarnings("unchecked")
+                    final Map<String, Object> properties = (Map<String, Object>) event.getProperty(PROPERTY_READ_JOB);
+                    properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+                    properties.remove(Job.PROPERTY_JOB_CREATED);
+                    properties.remove(Job.PROPERTY_JOB_CREATED_INSTANCE);
+
+                    final String jobTopic = (String) properties.remove(JobUtil.PROPERTY_JOB_TOPIC);
+                    final String jobName = (String) properties.remove(JobUtil.PROPERTY_JOB_NAME);
+                    final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULER_NAME);
+                    final ScheduleInfo scheduleInfo = (ScheduleInfo)  properties.remove(ResourceHelper.PROPERTY_SCHEDULER_INFO);
+
+                    // and now schedule (TODO)
+                    final ScheduledJobInfoImpl info = new ScheduledJobInfoImpl(this, jobTopic, jobName, properties, schedulerName, scheduleInfo);
+                    synchronized ( this.scheduledJobs ) {
+                        this.scheduledJobs.put(ResourceHelper.filterName(schedulerName), info);
+                    }
+                    if ( this.active ) {
+                        this.startScheduledJob(info);
+                    }
+                }
+                if ( event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_ADDED)
+                     || event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_CHANGED)) {
+                    final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+                    ResourceResolver resolver = null;
+                    try {
+                        resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                        final Resource eventResource = resolver.getResource(path);
+                        if ( ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(eventResource.getResourceType()) ) {
+                            final ReadResult result = this.readScheduledJob(eventResource);
+                            if ( result != null ) {
+                                if ( result.hasReadErrors ) {
+                                    synchronized ( this.unloadedEvents ) {
+                                        this.unloadedEvents.add(eventResource.getPath());
+                                    }
+                                } else {
+                                    nextEvent = result.event;
+                                }
+                            }
+                        }
+                    } catch (final LoginException le) {
+                        this.ignoreException(le);
+                    } finally {
+                        if ( resolver != null ) {
+                            resolver.close();
+                        }
+                    }
+                } else if ( event.getTopic().equals(SlingConstants.TOPIC_RESOURCE_REMOVED) ) {
+                    final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+                    final String scheduleName = ResourceUtil.getName(path);
+                    final ScheduledJobInfoImpl info;
+                    synchronized ( this.scheduledJobs ) {
+                        info = this.scheduledJobs.remove(scheduleName);
+                    }
+                    if ( info != null && this.active ) {
+                        logger.debug("Stopping scheduled job : {}", info.getName());
+                        try {
+                            this.scheduler.removeJob(info.getSchedulerJobId());
+                        } catch (final NoSuchElementException nsee) {
+                            // this can happen if the job is scheduled on another node
+                            // so we can just ignore this
+                        }
+
+                    }
+                }
+                event = nextEvent;
+            }
+        }
+    }
+
+    private void startScheduledJob(final ScheduledJobInfoImpl info) {
+        // Create configuration for scheduled job
+        final Map<String, Serializable> config = new HashMap<String, Serializable>();
+        config.put(PROPERTY_READ_JOB, info);
+
+        logger.debug("Adding scheduled job: {}", info.getName());
+        try {
+            switch ( info.getScheduleType() ) {
+                case DAILY:
+                    // TODO
+                    break;
+                case DATE:
+                    this.scheduler.fireJobAt(info.getSchedulerJobId(), this, config, info.getNextScheduledExecution());
+                    break;
+                case PERIODICALLY:
+                    this.scheduler.addPeriodicJob(info.getSchedulerJobId(), this, config, info.getPeriod() * 1000, false);
+                    break;
+                case WEEKLY:
+                    // TODO
+                    break;
+                }
+        } catch (final Exception e) {
+            // we ignore it if scheduled fails...
+            this.ignoreException(e);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+     */
+    @Override
+    public void execute(final JobContext context) {
+        final ScheduledJobInfoImpl info = (ScheduledJobInfoImpl) context.getConfiguration().get(PROPERTY_READ_JOB);
+
+        this.jobManager.addJob(info.getJobTopic(), info.getJobName(), info.getJobProperties());
+
+        // is this job scheduled for a specific date?
+        if ( info.getScheduleType() == ScheduledJobInfo.ScheduleType.DATE ) {
+            // we can remove it from the resource tree
+            this.unschedule(info);
+        }
+    }
+
+    public void unschedule(final ScheduledJobInfoImpl info) {
+        ResourceResolver resolver = null;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+            final StringBuilder sb = new StringBuilder(this.config.getScheduledJobsPathWithSlash());
+            sb.append('/');
+            sb.append(ResourceHelper.filterName(info.getName()));
+            final String path = sb.toString();
+
+            final Resource eventResource = resolver.getResource(path);
+            if ( eventResource != null ) {
+                resolver.delete(eventResource);
+                resolver.commit();
+            }
+        } catch (final LoginException le) {
+            this.ignoreException(le);
+        } catch (final PersistenceException pe) {
+            // we ignore the exception if removing fails
+            ignoreException(pe);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    @Override
+    public void handleEvent(final Event event) {
+        if ( this.running ) {
+            if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
+              || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
+                // bundle event started or updated
+                boolean doIt = false;
+                synchronized ( this.unloadedEvents ) {
+                    if ( this.unloadedEvents.size() > 0 ) {
+                        doIt = true;
+                    }
+                }
+                if ( doIt ) {
+                    final Runnable t = new Runnable() {
+
+                        @Override
+                        public void run() {
+                            synchronized (unloadedEvents) {
+                                ResourceResolver resolver = null;
+                                final Set<String> newUnloadedEvents = new HashSet<String>();
+                                newUnloadedEvents.addAll(unloadedEvents);
+                                try {
+                                    resolver = resourceResolverFactory.getAdministrativeResourceResolver(null);
+                                    for(final String path : unloadedEvents ) {
+                                        newUnloadedEvents.remove(path);
+                                        final Resource eventResource = resolver.getResource(path);
+                                        final ReadResult result = readScheduledJob(eventResource);
+                                        if ( result != null ) {
+                                            if ( result.hasReadErrors ) {
+                                                newUnloadedEvents.add(path);
+                                            } else {
+                                                try {
+                                                    queue.put(result.event);
+                                                } catch (InterruptedException e) {
+                                                    // we ignore this exception as this should never occur
+                                                    ignoreException(e);
+                                                }
+                                            }
+                                        }
+                                    }
+                                } catch (final LoginException re) {
+                                    // unable to create resource resolver so we try it again next time
+                                    ignoreException(re);
+                                } finally {
+                                    if ( resolver != null ) {
+                                        resolver.close();
+                                    }
+                                    unloadedEvents.clear();
+                                    unloadedEvents.addAll(newUnloadedEvents);
+                                }
+                            }
+                        }
+
+                    };
+                    Environment.THREAD_POOL.execute(t);
+                }
+            } else {
+                final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+                final String resourceType = (String)event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
+                if ( path != null && path.startsWith(this.config.getScheduledJobsPathWithSlash())
+                     && (resourceType == null || ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB.equals(resourceType))) {
+                    logger.debug("Received resource event for {} : {}", path, resourceType);
+                    try {
+                        this.queue.put(event);
+                    } catch (final InterruptedException ignore) {
+                        this.ignoreException(ignore);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Load all scheduled jobs from the resource tree
+     */
+    private void loadScheduledJobs(final long startTime) {
+        ResourceResolver resolver = null;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+            final Calendar startDate = Calendar.getInstance();
+            startDate.setTimeInMillis(startTime);
+
+            final StringBuilder buf = new StringBuilder(64);
+
+            buf.append("//element(*,");
+            buf.append(SCHEDULED_JOB_RESOURCE_TYPE);
+            buf.append(")[@");
+            buf.append(ISO9075.encode(org.apache.sling.event.jobs.Job.PROPERTY_JOB_CREATED));
+            buf.append(" < xs:dateTime('");
+            buf.append(ISO8601.format(startDate));
+            buf.append("')] order by @");
+            buf.append(ISO9075.encode(org.apache.sling.event.jobs.Job.PROPERTY_JOB_CREATED));
+            buf.append(" ascending");
+            final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+            while ( result.hasNext() ) {
+                final Resource eventResource = result.next();
+                // sanity check for the path
+                if ( eventResource.getPath().startsWith(this.config.getScheduledJobsPathWithSlash()) ) {
+                    final ReadResult readResult = this.readScheduledJob(eventResource);
+                    if ( readResult != null ) {
+                        if ( readResult.hasReadErrors ) {
+                            synchronized ( this.unloadedEvents ) {
+                                this.unloadedEvents.add(eventResource.getPath());
+                            }
+                        } else {
+                            try {
+                                this.queue.put(readResult.event);
+                            } catch (final InterruptedException e) {
+                                this.ignoreException(e);
+                            }
+                        }
+                    }
+                }
+            }
+
+        } catch (final QuerySyntaxException qse) {
+            this.ignoreException(qse);
+        } catch (final LoginException le) {
+            this.ignoreException(le);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+    }
+
+    private static final class ReadResult {
+        public Event event;
+        public boolean hasReadErrors;
+    }
+
+    /**
+     * Read a scheduled job from the resource
+     * @return The job or <code>null</code>
+     */
+    private ReadResult readScheduledJob(final Resource eventResource) {
+        if ( eventResource != null ) {
+            try {
+                final ValueMap vm = ResourceHelper.getValueMap(eventResource);
+                final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+                final ReadResult result = new ReadResult();
+                @SuppressWarnings("unchecked")
+                final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
+                result.hasReadErrors = readErrorList != null;
+                if ( readErrorList != null ) {
+                    for(final Exception e : readErrorList) {
+                        logger.warn("Unable to read scheduled job from " + eventResource.getPath(), e);
+                    }
+                }
+                final Map<String, Object> eventProps = Collections.singletonMap(PROPERTY_READ_JOB, (Object)properties);
+                result.event = new Event(TOPIC_READ_JOB, eventProps);
+
+                return result;
+            } catch (final InstantiationException ie) {
+                // something happened with the resource in the meantime
+                this.ignoreException(ie);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Write a schedule job to the resource tree.
+     * @throws PersistenceException
+     */
+    public boolean writeJob(
+            final String jobTopic,
+            final String jobName,
+            final Map<String, Object> jobProperties,
+            final String schedulerName,
+            final ScheduleInfo scheduleInfo)
+    throws PersistenceException {
+        ResourceResolver resolver = null;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+            // create properties
+            final Map<String, Object> properties = new HashMap<String, Object>();
+
+            if ( jobProperties != null ) {
+                for(final Map.Entry<String, Object> entry : jobProperties.entrySet() ) {
+                    final String propName = entry.getKey();
+                    if ( !ResourceHelper.ignoreProperty(propName) ) {
+                        properties.put(propName, entry.getValue());
+                    }
+                }
+            }
+
+            properties.put(JobUtil.PROPERTY_JOB_TOPIC, jobTopic);
+            if ( jobName != null ) {
+                properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
+            }
+            properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
+            properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
+
+            // put scheduler name and scheduler info
+            properties.put(ResourceHelper.PROPERTY_SCHEDULER_NAME, schedulerName);
+            properties.put(ResourceHelper.PROPERTY_SCHEDULER_INFO, scheduleInfo);
+
+            // create path and resource
+            properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
+
+            final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(schedulerName);
+
+            // update existing resource
+            final Resource existingInfo = resolver.getResource(path);
+            if ( existingInfo != null ) {
+                resolver.delete(existingInfo);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Updating scheduled job {} at {}", properties, path);
+                }
+            } else {
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Storing new scheduled job {} at {}", properties, path);
+                }
+            }
+            ResourceHelper.getOrCreateResource(resolver,
+                    path,
+                    properties);
+            return true;
+        } catch ( final LoginException le ) {
+            // we ignore this
+            this.ignoreException(le);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+     */
+    @Override
+    public void handleTopologyEvent(final TopologyEvent event) {
+        if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+            this.active = false;
+            this.stopScheduling();
+        } else if ( event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.TOPOLOGY_INIT ) {
+            final boolean previouslyActive = this.active;
+            this.active = event.getNewView().getLocalInstance().isLeader();
+            if ( this.active && !previouslyActive ) {
+                this.startScheduling();
+            }
+            if ( !this.active && previouslyActive ) {
+                this.stopScheduling();
+            }
+        }
+    }
+
+    public JobBuilder.ScheduleBuilder createJobBuilder(final ScheduledJobInfoImpl info) {
+        final JobBuilder builder = this.jobManager.createJob(info.getJobTopic()).name(info.getJobTopic()).properties(info.getJobProperties());
+        return builder.schedule(info.getName());
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java?rev=1529825&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java Mon Oct  7 12:07:10 2013
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.sling.event.impl.support.ScheduleInfo;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobBuilder.ScheduleBuilder;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+
+public class ScheduledJobInfoImpl implements ScheduledJobInfo, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String scheduleName;
+
+    private final String jobName;
+
+    private final String jobTopic;
+
+    private final Map<String, Object> jobProperties;
+
+    private final ScheduleInfo scheduleInfo;
+
+    private final JobSchedulerImpl jobScheduler;
+
+    public ScheduledJobInfoImpl(final JobSchedulerImpl jobScheduler,
+            final String jobTopic,
+            final String jobName,
+            final Map<String, Object> jobProperties,
+            final String scheduleName,
+            final ScheduleInfo scheduleInfo) {
+        this.jobScheduler = jobScheduler;
+        this.scheduleName = scheduleName;
+        this.jobName = jobName;
+        this.jobTopic = jobTopic;
+        this.jobProperties = jobProperties;
+        this.scheduleInfo = scheduleInfo;
+    }
+
+    @Override
+    public String getName() {
+        return this.scheduleName;
+    }
+
+    @Override
+    public ScheduleType getScheduleType() {
+        return this.scheduleInfo.getScheduleType();
+    }
+
+    @Override
+    public Date getNextScheduledExecution() {
+        if ( this.scheduleInfo.getScheduleType() == ScheduleType.DATE ) {
+            return this.scheduleInfo.getAt();
+        }
+        return null;
+    }
+
+    @Override
+    public int getDayOfWeek() {
+        return this.scheduleInfo.getDayOfWeek();
+    }
+
+    @Override
+    public int getHourOfDay() {
+        return this.scheduleInfo.getHourOfDay();
+    }
+
+    @Override
+    public int getMinuteOfHour() {
+        return this.scheduleInfo.getPeriod();
+    }
+
+    @Override
+    public int getPeriod() {
+        return this.scheduleInfo.getPeriod();
+    }
+
+    @Override
+    public String getJobTopic() {
+        return this.jobTopic;
+    }
+
+    @Override
+    public String getJobName() {
+        return this.jobName;
+    }
+
+    @Override
+    public Map<String, Object> getJobProperties() {
+        return this.jobProperties;
+    }
+
+    @Override
+    public void unschedule() {
+        this.jobScheduler.unschedule(this);
+    }
+
+    @Override
+    public ScheduleBuilder reschedule() {
+        return this.jobScheduler.createJobBuilder(this);
+    }
+
+    /**
+     * Get the scheduler job id
+     */
+    public String getSchedulerJobId() {
+        return Job.class.getName() + ":" + this.scheduleName;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Oct  7 12:07:10 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.event.impl.jobs;
 
+import java.io.Serializable;
 import java.util.Calendar;
 import java.util.Dictionary;
 import java.util.HashMap;
@@ -68,6 +69,25 @@ public abstract class Utility {
         return message;
     }
 
+    /**
+     * Check the job.
+     * @return <code>null</code> if the topic topic is correct and all properties are serializable,
+     *                           otherwise an error description is returned
+     */
+    public static String checkJob(final Object jobTopic, final Map<String, Object> properties) {
+        final String msg = checkJobTopic(jobTopic);
+        if ( msg == null ) {
+            if ( properties != null ) {
+                for(final Object val : properties.values()) {
+                    if ( val != null && !(val instanceof Serializable) ) {
+                        return "Discarding job - properties must be serializable: " + jobTopic + " : " + properties;
+                    }
+                }
+            }
+        }
+        return msg;
+    }
+
     /** Event property containing the time for job start and job finished events. */
     public static final String PROPERTY_TIME = "time";
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Mon Oct  7 12:07:10 2013
@@ -49,10 +49,16 @@ public abstract class ResourceHelper {
 
     public static final String RESOURCE_TYPE_EVENT = "slingevent:Event";
 
+    /** We use the same resource type as for timed events. */
+    public static final String RESOURCE_TYPE_SCHEDULED_JOB = "slingevent:TimedEvent";
+
     public static final String BUNDLE_EVENT_UPDATED = "org/osgi/framework/BundleEvent/UPDATED";
 
     public static final String BUNDLE_EVENT_STARTED = "org/osgi/framework/BundleEvent/STARTED";
 
+    public static final String PROPERTY_SCHEDULER_NAME = "slingevent:schedulerName";
+    public static final String PROPERTY_SCHEDULER_INFO = "slingevent:schedulerInfo";
+
     /** List of ignored properties to write to the repository. */
     @SuppressWarnings("deprecation")
     private static final String[] IGNORE_PROPERTIES = new String[] {
@@ -73,7 +79,9 @@ public abstract class ResourceHelper {
         Job.PROPERTY_JOB_PROGRESS_STEPS,
         Job.PROPERTY_FINISHED_DATE,
         JobImpl.PROPERTY_FINISHED_STATE,
-        Job.PROPERTY_RESULT_MESSAGE
+        Job.PROPERTY_RESULT_MESSAGE,
+        PROPERTY_SCHEDULER_INFO,
+        PROPERTY_SCHEDULER_NAME
     };
 
     /**

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java?rev=1529825&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java Mon Oct  7 12:07:10 2013
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.support;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import org.apache.sling.event.jobs.ScheduledJobInfo.ScheduleType;
+
+// TODO - implement serializing
+public class ScheduleInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static ScheduleInfo PERIODIC(final int minutes) {
+        return new ScheduleInfo(ScheduleType.PERIODICALLY, minutes, -1, -1, -1, null);
+    }
+
+    public static ScheduleInfo AT(final Date at) {
+        return new ScheduleInfo(ScheduleType.DATE, -1, -1, -1, -1, at);
+    }
+
+    public static ScheduleInfo WEEKLY(final int day, final int hour, final int minute) {
+        return new ScheduleInfo(ScheduleType.WEEKLY, -1, day, hour, minute, null);
+    }
+
+    public static ScheduleInfo DAYLY(final int hour, final int minute) {
+        return new ScheduleInfo(ScheduleType.DAILY, -1, -1, hour, minute, null);
+    }
+
+    private final ScheduleType scheduleType;
+
+    private final int period;
+
+    private final int dayOfWeek;
+
+    private final int hourOfDay;
+
+    private final int minuteOfHour;
+
+    private final Date at;
+
+    private ScheduleInfo(final ScheduleType scheduleType,
+            final int period,
+            final int dayOfWeek,
+            final int hourOfDay,
+            final int minuteOfHour,
+            final Date at) {
+        this.scheduleType = scheduleType;
+        this.period = period;
+        this.dayOfWeek = dayOfWeek;
+        this.hourOfDay = hourOfDay;
+        this.minuteOfHour = minuteOfHour;
+        this.at = at;
+    }
+
+    public Date getAt() {
+        return this.at;
+    }
+
+    public ScheduleType getScheduleType() {
+        return this.scheduleType;
+    }
+
+    public int getDayOfWeek() {
+        return this.dayOfWeek;
+    }
+
+    public int getHourOfDay() {
+        return this.hourOfDay;
+    }
+
+    public int getMinuteOfHour() {
+        return this.minuteOfHour;
+    }
+
+    public int getPeriod() {
+        return this.period;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobBuilder.java Mon Oct  7 12:07:10 2013
@@ -53,6 +53,7 @@ public interface JobBuilder {
      * Schedule the job
      * If a job scheduler with the same name already exists, it is updated
      * with the new information.
+     * If no name is provided (empty name or null), the job can't be scheduled.
      * @param name Unique name for the scheduler.
      * @return A schedule builder to schedule the jobs
      */
@@ -64,7 +65,8 @@ public interface JobBuilder {
     public interface ScheduleBuilder {
 
         /**
-         * Schedule the job periodically every N minutes
+         * Schedule the job periodically every N minutes.
+         * If the minutes argument is less than 1, the job can't be scheduled.
          * @param minutes Positive number of minutes
          * @return <code>true</code> if the job could be scheduled, <code>false</code>otherwise.
          */
@@ -77,12 +79,14 @@ public interface JobBuilder {
 
         /**
          * Schedule the job weekly, the time needs to be specified in addition.
-         * @param day Day of the week, Sunday being one, Monday two, up to Saturday being seven.
+         * If a value lower than 1 or higher than 7 is used, the job can't be scheduled.
+         * @param day Day of the week, 1:Sunday, 2:Monday, ... 7:Saturday.
          */
         TimeBuilder weekly(final int day);
 
         /**
          * Schedule the job for a specific date.
+         * If no date or a a date in the past is provided, the job can't be scheduled.
          * @param date The date
          * @return <code>true</code> if the job could be scheduled, <code>false</code>otherwise.
          */
@@ -93,6 +97,8 @@ public interface JobBuilder {
 
         /**
          * Schedule the job for the given hour and minute.
+         * If a value less than zero for hour or minute is specified or a value higher than 23 for hour or
+         * a value higher than 59 for minute than the job can't be scheduled.
          * @param hour  Hour of the day ranging from 0 to 23.
          * @param minute Minute of the hour ranging from 0 to 59.
          * @return <code>true</code> if the job could be scheduled, <code>false</code>otherwise.

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/ScheduledJobInfo.java Mon Oct  7 12:07:10 2013
@@ -23,35 +23,87 @@ import java.util.Map;
 
 import aQute.bnd.annotation.ProviderType;
 
+/**
+ * Information about a scheduled job
+ * @since 1.3
+ */
 @ProviderType
 public interface ScheduledJobInfo {
 
     enum ScheduleType {
-        DATE,
-        PERIODICALLY,
-        DAILY,
-        WEEKLY
+        DATE,         // scheduled for a date
+        PERIODICALLY, // scheduled periodically (minutes)
+        DAILY,        // scheduled once a day
+        WEEKLY        // scheduled once a week
     }
 
+    /**
+     * Return the unique scheduling name.
+     * @return The unique name
+     */
     String getName();
 
+    /**
+     * Return the scheduling type
+     * @return The scheduling type
+     */
     ScheduleType getScheduleType();
 
+    /**
+     * Return the next scheduled execution date.
+     */
     Date getNextScheduledExecution();
 
+    /**
+     * If the job is scheduled weekly, returns the day of the week
+     * @return The day of the week (from 1 to 7) or -1
+     */
     int getDayOfWeek();
 
+    /**
+     * Return the hour of the day for daily and weekly scheduled jobs
+     * @return The hour of the day (from 0 to 23) or -1
+     */
     int getHourOfDay();
 
+    /**
+     * Return the minute of the hour for daily and weekly scheduled jobs.
+     * @return The minute of the hour (from 0 to 59) or -1
+     */
     int getMinuteOfHour();
 
+    /**
+     * For periodically scheduled jobs, return the period in minutes.
+     * @return The period in minutes or -1
+     */
     int getPeriod();
 
+    /**
+     * Return the job topic.
+     * @return The job topic
+     */
     String getJobTopic();
 
+    /**
+     * Return the optional job name.
+     * @return The job name or <code>null</code>
+     */
     String getJobName();
 
+    /**
+     * Return the optional job topics.
+     * @return The job topics or <code>null</code>
+     */
     Map<String, Object> getJobProperties();
 
+    /**
+     * Unschedule this scheduled job.
+     */
     void unschedule();
+
+    /**
+     * Reschedule this job with a new rescheduling information.
+     * If rescheduling fails, the job will be unscheduled.
+     */
+    JobBuilder.ScheduleBuilder reschedule();
 }

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1529825&r1=1529824&r2=1529825&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java Mon Oct  7 12:07:10 2013
@@ -167,7 +167,7 @@ public abstract class AbstractJobHandlin
         // set load delay to 3 sec
         final org.osgi.service.cm.Configuration c2 = this.configAdmin.getConfiguration("org.apache.sling.event.impl.jobs.jcr.PersistenceHandler", null);
         Dictionary<String, Object> p2 = new Hashtable<String, Object>();
-        p2.put(JobManagerConfiguration.CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY, 3L);
+        p2.put(JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY, 3L);
         c2.update(p2);
 
         final StartupHandler handler = new StartupHandler() {