You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/04/30 14:06:26 UTC

svn commit: r1676967 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl: jobs/ jobs/scheduling/ support/

Author: cziegeler
Date: Thu Apr 30 12:06:25 2015
New Revision: 1676967

URL: http://svn.apache.org/r1676967
Log:
SLING-4680 : Decouple scheduled jobs from observation

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java
      - copied, changed from r1676922, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
Removed:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java Thu Apr 30 12:06:25 2015
@@ -18,16 +18,13 @@
  */
 package org.apache.sling.event.impl.jobs;
 
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.impl.jobs.scheduling.JobScheduleBuilderImpl;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobBuilder;
-import org.apache.sling.event.jobs.ScheduledJobInfo;
 
 /**
  * Fluent builder API
@@ -64,81 +61,18 @@ public class JobBuilderImpl implements J
 
     @Override
     public ScheduleBuilder schedule() {
-        return new ScheduleBuilderImpl(UUID.randomUUID().toString());
+        return new JobScheduleBuilderImpl(
+                this.topic,
+                this.properties,
+                UUID.randomUUID().toString(),
+                this.jobManager.getJobScheduler());
     }
 
     public ScheduleBuilder schedule(final String name) {
-        return new ScheduleBuilderImpl(name);
-    }
-
-    public final class ScheduleBuilderImpl implements ScheduleBuilder {
-
-        private final String scheduleName;
-
-        private boolean suspend = false;
-
-        private final List<ScheduleInfoImpl> schedules = new ArrayList<ScheduleInfoImpl>();
-
-        public ScheduleBuilderImpl(final String name) {
-            this.scheduleName = name;
-        }
-
-        @Override
-        public ScheduleBuilder weekly(final int day, final int hour, final int minute) {
-            schedules.add(ScheduleInfoImpl.WEEKLY(day, hour, minute));
-            return this;
-        }
-
-        @Override
-        public ScheduleBuilder daily(final int hour, final int minute) {
-            schedules.add(ScheduleInfoImpl.DAILY(hour, minute));
-            return this;
-        }
-
-        @Override
-        public ScheduleBuilder hourly(final int minute) {
-            schedules.add(ScheduleInfoImpl.HOURLY(minute));
-            return this;
-        }
-
-        @Override
-        public ScheduleBuilder at(final Date date) {
-            schedules.add(ScheduleInfoImpl.AT(date));
-            return this;
-        }
-
-        @Override
-        public ScheduleBuilder monthly(final int day, final int hour, final int minute) {
-            schedules.add(ScheduleInfoImpl.MONTHLY(day, hour, minute));
-            return this;
-        }
-
-        @Override
-        public ScheduleBuilder yearly(final int month, final int day, final int hour, final int minute) {
-            schedules.add(ScheduleInfoImpl.YEARLY(month, day, hour, minute));
-            return this;
-        }
-
-        @Override
-        public ScheduleBuilder cron(final String expression) {
-            schedules.add(ScheduleInfoImpl.CRON(expression));
-            return this;
-        }
-
-        @Override
-        public ScheduledJobInfo add() {
-            return this.add(null);
-        }
-
-        @Override
-        public ScheduledJobInfo add(final List<String> errors) {
-            return jobManager.addScheduledJob(topic, null, properties, scheduleName, suspend, schedules, errors);
-        }
-
-        @Override
-        public ScheduleBuilder suspend() {
-            this.suspend = true;
-            return this;
-        }
+        return new JobScheduleBuilderImpl(
+                this.topic,
+                this.properties,
+                name,
+                this.jobManager.getJobScheduler());
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Apr 30 12:06:25 2015
@@ -49,11 +49,11 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.impl.jobs.queues.JobQueueImpl;
 import org.apache.sling.event.impl.jobs.queues.QueueManager;
+import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
 import org.apache.sling.event.impl.jobs.tasks.CleanUpTask;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.impl.support.ScheduleInfoImpl;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.Job.JobState;
 import org.apache.sling.event.jobs.JobBuilder;
@@ -123,7 +123,7 @@ public class JobManagerImpl
     private CleanUpTask maintenanceTask;
 
     /** Job Scheduler. */
-    private JobSchedulerImpl jobScheduler;
+    private org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl jobScheduler;
 
     /**
      * Activate this component.
@@ -131,7 +131,7 @@ public class JobManagerImpl
      */
     @Activate
     protected void activate(final Map<String, Object> props) throws LoginException {
-        this.jobScheduler = new JobSchedulerImpl(this.configuration, this.scheduler, this);
+        this.jobScheduler = new org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl(this.configuration, this.scheduler, this);
         this.maintenanceTask = new CleanUpTask(this.configuration);
 
         logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID);
@@ -903,49 +903,6 @@ public class JobManagerImpl
         return this.jobScheduler.getScheduledJobs(topic, limit, templates);
     }
 
-    public ScheduledJobInfo addScheduledJob(final String topic,
-            final String jobName,
-            final Map<String, Object> properties,
-            final String scheduleName,
-            final boolean isSuspended,
-            final List<ScheduleInfoImpl> scheduleInfos,
-            final List<String> errors) {
-        final List<String> msgs = new ArrayList<String>();
-        if ( scheduleName == null || scheduleName.length() == 0 ) {
-            msgs.add("Schedule name not specified");
-        }
-        final String errorMessage = Utility.checkJob(topic, properties);
-        if ( errorMessage != null ) {
-            msgs.add(errorMessage);
-        }
-        if ( scheduleInfos.size() == 0 ) {
-            msgs.add("No schedule defined for " + scheduleName);
-        }
-        for(final ScheduleInfoImpl info : scheduleInfos) {
-            info.check(msgs);
-        }
-        if ( msgs.size() == 0 ) {
-            try {
-                final ScheduledJobInfo info = this.jobScheduler.writeJob(topic, jobName, properties, scheduleName, isSuspended, scheduleInfos);
-                if ( info != null ) {
-                    return info;
-                }
-                msgs.add("Unable to persist scheduled job.");
-            } catch ( final PersistenceException pe) {
-                msgs.add("Unable to persist scheduled job: " + scheduleName);
-                logger.warn("Unable to persist scheduled job", pe);
-            }
-        } else {
-            for(final String msg : msgs) {
-                logger.warn(msg);
-            }
-        }
-        if ( errors != null ) {
-            errors.addAll(msgs);
-        }
-        return null;
-    }
-
     /**
      * Internal method to add a job
      */
@@ -1017,4 +974,8 @@ public class JobManagerImpl
         }
         return null;
     }
+
+    public JobSchedulerImpl getJobScheduler() {
+        return this.jobScheduler;
+    }
 }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java?rev=1676967&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java Thu Apr 30 12:06:25 2015
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.scheduling;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.jobs.JobBuilder.ScheduleBuilder;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+
+/**
+ * The builder implementation for scheduled jobs.
+ */
+public final class JobScheduleBuilderImpl implements ScheduleBuilder {
+
+    private final String topic;
+
+    private final Map<String, Object> properties;
+
+    private final String scheduleName;
+
+    private final JobSchedulerImpl jobScheduler;
+
+    private volatile boolean suspend = false;
+
+    private final List<ScheduleInfoImpl> schedules = new ArrayList<ScheduleInfoImpl>();
+
+    public JobScheduleBuilderImpl(
+            final String topic,
+            final Map<String, Object> properties,
+            final String name,
+            final JobSchedulerImpl jobScheduler) {
+        this.topic = topic;
+        this.properties = properties;
+        this.scheduleName = name;
+        this.jobScheduler = jobScheduler;
+    }
+
+    @Override
+    public ScheduleBuilder weekly(final int day, final int hour, final int minute) {
+        schedules.add(ScheduleInfoImpl.WEEKLY(day, hour, minute));
+        return this;
+    }
+
+    @Override
+    public ScheduleBuilder daily(final int hour, final int minute) {
+        schedules.add(ScheduleInfoImpl.DAILY(hour, minute));
+        return this;
+    }
+
+    @Override
+    public ScheduleBuilder hourly(final int minute) {
+        schedules.add(ScheduleInfoImpl.HOURLY(minute));
+        return this;
+    }
+
+    @Override
+    public ScheduleBuilder at(final Date date) {
+        schedules.add(ScheduleInfoImpl.AT(date));
+        return this;
+    }
+
+    @Override
+    public ScheduleBuilder monthly(final int day, final int hour, final int minute) {
+        schedules.add(ScheduleInfoImpl.MONTHLY(day, hour, minute));
+        return this;
+    }
+
+    @Override
+    public ScheduleBuilder yearly(final int month, final int day, final int hour, final int minute) {
+        schedules.add(ScheduleInfoImpl.YEARLY(month, day, hour, minute));
+        return this;
+    }
+
+    @Override
+    public ScheduleBuilder cron(final String expression) {
+        schedules.add(ScheduleInfoImpl.CRON(expression));
+        return this;
+    }
+
+    @Override
+    public ScheduledJobInfo add() {
+        return this.add(null);
+    }
+
+    @Override
+    public ScheduledJobInfo add(final List<String> errors) {
+        return this.jobScheduler.addScheduledJob(topic,
+                properties,
+                scheduleName,
+                suspend,
+                schedules,
+                errors);
+    }
+
+    @Override
+    public ScheduleBuilder suspend() {
+        this.suspend = true;
+        return this;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java?rev=1676967&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java Thu Apr 30 12:06:25 2015
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.scheduling;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.jobs.JobBuilder;
+import org.apache.sling.event.jobs.ScheduleInfo;
+import org.apache.sling.event.jobs.ScheduleInfo.ScheduleType;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The scheduler for managing scheduled jobs.
+ *
+ * This is not a component by itself, it's directly created from the job manager.
+ * The job manager is also registering itself as an event handler and forwards
+ * the events to this service.
+ */
+public class JobSchedulerImpl
+    implements EventHandler,
+               ConfigurationChangeListener,
+               org.apache.sling.commons.scheduler.Job {
+
+    private static final String PROPERTY_READ_JOB = "properties";
+
+    private static final String PROPERTY_SCHEDULE_INDEX = "index";
+
+    /** Default logger */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Is this active? */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /** Central job handling configuration. */
+    private final JobManagerConfiguration configuration;
+
+    /** Scheduler service. */
+    private final Scheduler scheduler;
+
+    /** Job manager. */
+    private final JobManagerImpl jobManager;
+
+    /** Scheduled job handler. */
+    private final ScheduledJobHandler scheduledJobHandler;
+
+    /** All scheduled jobs, by scheduler name */
+    private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<String, ScheduledJobInfoImpl>();
+
+    /**
+     * Create the scheduler
+     * @param configuration Central job manager configuration
+     * @param scheduler The scheduler service
+     * @param jobManager The job manager
+     */
+    public JobSchedulerImpl(final JobManagerConfiguration configuration,
+            final Scheduler scheduler,
+            final JobManagerImpl jobManager) {
+        this.configuration = configuration;
+        this.scheduler = scheduler;
+        this.jobManager = jobManager;
+
+        this.configuration.addListener(this);
+
+        this.scheduledJobHandler = new ScheduledJobHandler(configuration, this);
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    public void deactivate() {
+        this.configuration.removeListener(this);
+
+        this.scheduledJobHandler.deactivate();
+
+        if ( this.active.compareAndSet(true, false) ) {
+            this.stopScheduling();
+        }
+        synchronized ( this.scheduledJobs ) {
+            this.scheduledJobs.clear();
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener#configurationChanged(boolean)
+     */
+    @Override
+    public void configurationChanged(final boolean processingActive) {
+        // scheduling is only active if
+        // - processing is active and
+        // - configuration is still available and active
+        // - and current instance is leader
+        final boolean schedulingActive;
+        if ( processingActive ) {
+            final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
+            if ( caps != null && caps.isActive() ) {
+                schedulingActive = caps.isLeader();
+            } else {
+                schedulingActive = false;
+            }
+        } else {
+            schedulingActive = false;
+        }
+
+        // switch activation based on current state and new state
+        if ( schedulingActive ) {
+            // activate if inactive
+            if ( this.active.compareAndSet(false, true) ) {
+                this.startScheduling();
+            }
+        } else {
+            // deactivate if active
+            if ( this.active.compareAndSet(true, false) ) {
+                this.stopScheduling();
+            }
+        }
+    }
+
+    /**
+     * Start all scheduled jobs
+     */
+    private void startScheduling() {
+        synchronized ( this.scheduledJobs ) {
+            for(final ScheduledJobInfo info : this.scheduledJobs.values()) {
+                this.startScheduledJob(((ScheduledJobInfoImpl)info));
+            }
+        }
+    }
+
+    /**
+     * Stop all scheduled jobs.
+     */
+    private void stopScheduling() {
+        synchronized ( this.scheduledJobs ) {
+            for(final ScheduledJobInfo info : this.scheduledJobs.values()) {
+                this.stopScheduledJob((ScheduledJobInfoImpl)info);
+            }
+        }
+    }
+
+    /**
+     * Add a scheduled job
+     */
+    public void scheduleJob(final ScheduledJobInfoImpl info) {
+        synchronized ( this.scheduledJobs ) {
+            this.scheduledJobs.put(info.getName(), info);
+            this.startScheduledJob(info);
+        }
+    }
+
+    /**
+     * Remove a scheduled job
+     */
+    public void unscheduleJob(final ScheduledJobInfoImpl info) {
+        synchronized ( this.scheduledJobs ) {
+            if ( this.scheduledJobs.remove(info.getName()) != null ) {
+                this.stopScheduledJob(info);
+            }
+        }
+    }
+
+    /**
+     * Start a scheduled job
+     * @param info The scheduling info
+     */
+    private void startScheduledJob(final ScheduledJobInfoImpl info) {
+        if ( this.active.get() ) {
+            if ( !info.isSuspended() ) {
+                this.configuration.getAuditLogger().debug("SCHEDULED OK name={}, topic={}, properties={} : {}",
+                        new Object[] {info.getName(),
+                                      info.getJobTopic(),
+                                      info.getJobProperties()},
+                                      info.getSchedules());
+                int index = 0;
+                for(final ScheduleInfo si : info.getSchedules()) {
+                    final String name = info.getSchedulerJobId() + "-" + String.valueOf(index);
+                    ScheduleOptions options = null;
+                    switch ( si.getType() ) {
+                        case DAILY:
+                        case WEEKLY:
+                        case HOURLY:
+                        case MONTHLY:
+                        case YEARLY:
+                        case CRON:
+                            options = this.scheduler.EXPR(((ScheduleInfoImpl)si).getCronExpression());
+
+                            break;
+                        case DATE:
+                            options = this.scheduler.AT(((ScheduleInfoImpl)si).getNextScheduledExecution());
+                            break;
+                    }
+                    // Create configuration for scheduled job
+                    final Map<String, Serializable> config = new HashMap<String, Serializable>();
+                    config.put(PROPERTY_READ_JOB, info);
+                    config.put(PROPERTY_SCHEDULE_INDEX, index);
+                    this.scheduler.schedule(this, options.name(name).config(config).canRunConcurrently(false));
+                    index++;
+                }
+            } else {
+                this.configuration.getAuditLogger().debug("SCHEDULED SUSPENDED name={}, topic={}, properties={} : {}",
+                        new Object[] {info.getName(),
+                                      info.getJobTopic(),
+                                      info.getJobProperties(),
+                                      info.getSchedules()});
+            }
+        }
+    }
+
+    /**
+     * Stop a scheduled job
+     * @param info The scheduling info
+     */
+    private void stopScheduledJob(final ScheduledJobInfoImpl info) {
+        if ( this.active.get() ) {
+            this.configuration.getAuditLogger().debug("SCHEDULED STOP name={}, topic={}, properties={} : {}",
+                    new Object[] {info.getName(),
+                                  info.getJobTopic(),
+                                  info.getJobProperties(),
+                                  info.getSchedules()});
+            for(int index = 0; index<info.getSchedules().size(); index++) {
+                final String name = info.getSchedulerJobId() + "-" + String.valueOf(index);
+                this.scheduler.unschedule(name);
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+     */
+    @Override
+    public void execute(final JobContext context) {
+        final ScheduledJobInfoImpl info = (ScheduledJobInfoImpl) context.getConfiguration().get(PROPERTY_READ_JOB);
+
+        if ( info.isSuspended() ) {
+            return;
+        }
+
+        this.jobManager.addJob(info.getJobTopic(), info.getJobProperties());
+        final int index = (Integer)context.getConfiguration().get(PROPERTY_SCHEDULE_INDEX);
+        final Iterator<ScheduleInfo> iter = info.getSchedules().iterator();
+        ScheduleInfo si = iter.next();
+        for(int i=0; i<index; i++) {
+            si = iter.next();
+        }
+        // if scheduled once (DATE), remove from schedule
+        if ( si.getType() == ScheduleType.DATE ) {
+            if ( index == 0 && info.getSchedules().size() == 1 ) {
+                // remove
+                this.scheduledJobHandler.remove(info);
+            } else {
+                // update schedule list
+                final List<ScheduleInfo> infos = new ArrayList<ScheduleInfo>();
+                for(final ScheduleInfo i : info.getSchedules() ) {
+                    if ( i != si ) { // no need to use equals
+                        infos.add(i);
+                    }
+                }
+                info.update(infos);
+                this.scheduledJobHandler.updateSchedule(info.getName(), infos);
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    @Override
+    public void handleEvent(final Event event) {
+        if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
+             || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
+            this.scheduledJobHandler.bundleEvent();
+        } else {
+            // resource event
+            final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+            if ( path != null && path.startsWith(this.configuration.getScheduledJobsPath(true)) ) {
+                if ( SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ) {
+                    // removal
+                    this.scheduledJobHandler.handleRemove(path);
+                } else {
+                    // add or update
+                    this.scheduledJobHandler.handleAddUpdate(path);
+                }
+            }
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Create a schedule builder for a currently scheduled job
+     */
+    public JobBuilder.ScheduleBuilder createJobBuilder(final ScheduledJobInfoImpl info) {
+        final JobBuilder.ScheduleBuilder sb = new JobScheduleBuilderImpl(info.getJobTopic(),
+                info.getJobProperties(), info.getName(), this);
+        return (info.isSuspended() ? sb.suspend() : sb);
+    }
+
+    private enum Operation {
+        LESS,
+        LESS_OR_EQUALS,
+        EQUALS,
+        GREATER_OR_EQUALS,
+        GREATER
+    }
+
+    /**
+     * Check if the job matches the template
+     */
+    private boolean match(final ScheduledJobInfoImpl job, final Map<String, Object> template) {
+        if ( template != null ) {
+            for(final Map.Entry<String, Object> current : template.entrySet()) {
+                final String key = current.getKey();
+                final char firstChar = key.length() > 0 ? key.charAt(0) : 0;
+                final String propName;
+                final Operation op;
+                if ( firstChar == '=' ) {
+                    propName = key.substring(1);
+                    op  = Operation.EQUALS;
+                } else if ( firstChar == '<' ) {
+                    final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+                    if ( secondChar == '=' ) {
+                        op = Operation.LESS_OR_EQUALS;
+                        propName = key.substring(2);
+                    } else {
+                        op = Operation.LESS;
+                        propName = key.substring(1);
+                    }
+                } else if ( firstChar == '>' ) {
+                    final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+                    if ( secondChar == '=' ) {
+                        op = Operation.GREATER_OR_EQUALS;
+                        propName = key.substring(2);
+                    } else {
+                        op = Operation.GREATER;
+                        propName = key.substring(1);
+                    }
+                } else {
+                    propName = key;
+                    op  = Operation.EQUALS;
+                }
+                final Object value = current.getValue();
+
+                if ( op == Operation.EQUALS ) {
+                    if ( !value.equals(job.getJobProperties().get(propName)) ) {
+                        return false;
+                    }
+                } else {
+                    if ( value instanceof Comparable ) {
+                        @SuppressWarnings({ "unchecked", "rawtypes" })
+                        final int result = ((Comparable)value).compareTo(job.getJobProperties().get(propName));
+                        if ( op == Operation.LESS && result > -1 ) {
+                            return false;
+                        } else if ( op == Operation.LESS_OR_EQUALS && result > 0 ) {
+                            return false;
+                        } else if ( op == Operation.GREATER_OR_EQUALS && result < 0 ) {
+                            return false;
+                        } else if ( op == Operation.GREATER && result < 1 ) {
+                            return false;
+                        }
+                    } else {
+                        // if the value is not comparable we simply don't match
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Get all scheduled jobs
+     */
+    public Collection<ScheduledJobInfo> getScheduledJobs(final String topic,
+            final long limit,
+            final Map<String, Object>... templates) {
+        final List<ScheduledJobInfo> jobs = new ArrayList<ScheduledJobInfo>();
+        long count = 0;
+        synchronized ( this.scheduledJobs ) {
+            for(final ScheduledJobInfoImpl job : this.scheduledJobs.values() ) {
+                boolean add = true;
+                if ( topic != null && !topic.equals(job.getJobTopic()) ) {
+                    add = false;
+                }
+                if ( add && templates != null && templates.length != 0 ) {
+                    add = false;
+                    for (Map<String,Object> template : templates) {
+                        add = this.match(job, template);
+                        if ( add ) {
+                            break;
+                        }
+                    }
+                }
+                if ( add ) {
+                    jobs.add(job);
+                    count++;
+                    if ( limit > 0 && count == limit ) {
+                        break;
+                    }
+                }
+            }
+        }
+        return jobs;
+    }
+
+    /**
+     * Change the suspended flag for a scheduled job
+     * @param info The schedule info
+     * @param flag The corresponding flag
+     */
+    public void setSuspended(final ScheduledJobInfoImpl info, final boolean flag) {
+        final ResourceResolver resolver = configuration.createResourceResolver();
+        try {
+            final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true));
+            sb.append(ResourceHelper.filterName(info.getName()));
+            final String path = sb.toString();
+
+            final Resource eventResource = resolver.getResource(path);
+            if ( eventResource != null ) {
+                final ModifiableValueMap mvm = eventResource.adaptTo(ModifiableValueMap.class);
+                if ( flag ) {
+                    mvm.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
+                } else {
+                    mvm.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED);
+                }
+                resolver.commit();
+            }
+            if ( flag ) {
+                this.stopScheduledJob(info);
+            } else {
+                this.startScheduledJob(info);
+            }
+        } catch (final PersistenceException pe) {
+            // we ignore the exception if removing fails
+            ignoreException(pe);
+        } finally {
+            resolver.close();
+        }
+    }
+
+    /**
+     * Add a scheduled job
+     * @param topic The job topic
+     * @param properties The job properties
+     * @param scheduleName The schedule name
+     * @param isSuspended Whether it is suspended
+     * @param scheduleInfos The scheduling information
+     * @param errors Optional list to contain potential errors
+     * @return A new job info or {@code null}
+     */
+    public ScheduledJobInfo addScheduledJob(final String topic,
+            final Map<String, Object> properties,
+            final String scheduleName,
+            final boolean isSuspended,
+            final List<ScheduleInfoImpl> scheduleInfos,
+            final List<String> errors) {
+        final List<String> msgs = new ArrayList<String>();
+        if ( scheduleName == null || scheduleName.length() == 0 ) {
+            msgs.add("Schedule name not specified");
+        }
+        final String errorMessage = Utility.checkJob(topic, properties);
+        if ( errorMessage != null ) {
+            msgs.add(errorMessage);
+        }
+        if ( scheduleInfos.size() == 0 ) {
+            msgs.add("No schedule defined for " + scheduleName);
+        }
+        for(final ScheduleInfoImpl info : scheduleInfos) {
+            info.check(msgs);
+        }
+        if ( msgs.size() == 0 ) {
+            try {
+                final ScheduledJobInfo info = this.scheduledJobHandler.addOrUpdateJob(topic, properties, scheduleName, isSuspended, scheduleInfos);
+                if ( info != null ) {
+                    return info;
+                }
+                msgs.add("Unable to persist scheduled job.");
+            } catch ( final PersistenceException pe) {
+                msgs.add("Unable to persist scheduled job: " + scheduleName);
+                logger.warn("Unable to persist scheduled job", pe);
+            }
+        } else {
+            for(final String msg : msgs) {
+                logger.warn(msg);
+            }
+        }
+        if ( errors != null ) {
+            errors.addAll(msgs);
+        }
+        return null;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java?rev=1676967&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java Thu Apr 30 12:06:25 2015
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.scheduling;
+
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.ScheduleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ScheduledJobHandler implements Runnable {
+
+    public static final class Holder {
+        public Calendar created;
+        public ScheduledJobInfoImpl info;
+        public long read;
+    }
+
+    /** Logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** The job manager configuration. */
+    private final JobManagerConfiguration configuration;
+
+    /** The job scheduler. */
+    private final JobSchedulerImpl jobScheduler;
+
+    /** The map of all scheduled jobs, key is the filtered schedule name */
+    private final Map<String, Holder> scheduledJobs = new HashMap<String, Holder>();
+
+    private final AtomicLong lastBundleActivity = new AtomicLong();
+
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
+    /** A local queue for serializing the event processing. */
+    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+    /**
+     * @param configuration Current job manager configuration
+     */
+    public ScheduledJobHandler(final JobManagerConfiguration configuration,
+            final JobSchedulerImpl jobScheduler) {
+        this.configuration = configuration;
+        this.jobScheduler = jobScheduler;
+        final Thread t = new Thread(this, "Apache Sling Scheduled Job Handler Thread");
+        t.setDaemon(true);
+        t.start();
+
+        this.addFullScan();
+    }
+
+    /**
+     * Add a task/runnable to the queue
+     */
+    private void addTask(final Runnable r) {
+        try {
+            this.queue.put(r);
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+            Thread.currentThread().interrupt();
+        }
+    }
+    /**
+     * Add a full scan to the task queue
+     */
+    private void addFullScan() {
+        this.addTask(new Runnable() {
+            @Override
+            public void run() {
+                scan();
+            }
+        });
+    }
+
+    public void deactivate() {
+        this.isRunning.set(false);
+        this.queue.clear();
+        // put a NOP runnable to wake up the queue
+        this.addTask(new Runnable() {
+            @Override
+            public  void run() {
+                // do nothing
+            }
+        });
+    }
+
+    @Override
+    public void run() {
+        while ( this.isRunning.get() ) {
+            Runnable r = null;
+            try {
+                r = this.queue.take();
+            } catch (final InterruptedException e) {
+                this.ignoreException(e);
+                Thread.currentThread().interrupt();
+                this.isRunning.set(false);
+            }
+            if ( this.isRunning.get() && r != null) {
+                r.run();
+            }
+        }
+    }
+
+    private void scan() {
+        final ResourceResolver resolver = configuration.createResourceResolver();
+        if ( resolver != null ) {
+            try {
+                logger.debug("Scanning for scheduled jobs...");
+                final String path = this.configuration.getScheduledJobsPath(false);
+                final Resource startResource = resolver.getResource(path);
+                if ( startResource != null ) {
+                    final Map<String, Holder> newScheduledJobs = new HashMap<String, Holder>();
+                    synchronized ( this.scheduledJobs ) {
+                        for(final Resource rsrc : startResource.getChildren()) {
+                            if ( !isRunning.get() ) {
+                                break;
+                            }
+                            handleAddOrUpdate(newScheduledJobs, rsrc);
+                        }
+                        if ( isRunning.get() ) {
+                            for(final Holder h : this.scheduledJobs.values()) {
+                                if ( h.info != null ) {
+                                    this.jobScheduler.unscheduleJob(h.info);
+                                }
+                            }
+                            this.scheduledJobs.clear();
+                            this.scheduledJobs.putAll(newScheduledJobs);
+                        }
+                    }
+                }
+                logger.debug("Finished scanning for scheduled jobs...");
+            } finally {
+                resolver.close();
+            }
+        }
+    }
+
+    /**
+     * Read a scheduled job from the resource
+     * @return The job or <code>null</code>
+     */
+    private Map<String, Object> readScheduledJob(final Resource eventResource) {
+        try {
+            final ValueMap vm = ResourceHelper.getValueMap(eventResource);
+            final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+
+            @SuppressWarnings("unchecked")
+            final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
+            if ( readErrorList != null ) {
+                for(final Exception e : readErrorList) {
+                    logger.warn("Unable to read scheduled job from " + eventResource.getPath(), e);
+                }
+            } else {
+                return properties;
+            }
+        } catch (final InstantiationException ie) {
+            // something happened with the resource in the meantime
+            this.ignoreException(ie);
+        }
+        return null;
+    }
+
+    /**
+     * Write a scheduled job to the resource tree.
+     * @throws PersistenceException
+     */
+    public ScheduledJobInfoImpl addOrUpdateJob(
+            final String jobTopic,
+            final Map<String, Object> jobProperties,
+            final String scheduleName,
+            final boolean suspend,
+            final List<ScheduleInfoImpl> scheduleInfos)
+    throws PersistenceException {
+        final Map<String, Object> properties = this.writeScheduledJob(jobTopic, jobProperties, scheduleName, suspend, scheduleInfos);
+
+        final String key = ResourceHelper.filterName(scheduleName);
+        synchronized ( this.scheduledJobs ) {
+            final Holder h = this.scheduledJobs.remove(key);
+            if ( h != null && h.info != null ) {
+                this.jobScheduler.unscheduleJob(h.info);
+            }
+            final Holder holder = new Holder();
+            holder.created = (Calendar) properties.get(Job.PROPERTY_JOB_CREATED);
+            holder.read = System.currentTimeMillis();
+            holder.info = this.addOrUpdateScheduledJob(properties, h == null ? null : h.info);
+
+            this.jobScheduler.scheduleJob(holder.info);
+            return holder.info;
+        }
+    }
+
+    private Map<String, Object> writeScheduledJob(final String jobTopic,
+            final Map<String, Object> jobProperties,
+            final String scheduleName,
+            final boolean suspend,
+            final List<ScheduleInfoImpl> scheduleInfos)
+    throws PersistenceException {
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            // create properties
+            final Map<String, Object> properties = new HashMap<String, Object>();
+
+            if ( jobProperties != null ) {
+                for(final Map.Entry<String, Object> entry : jobProperties.entrySet() ) {
+                    final String propName = entry.getKey();
+                    if ( !ResourceHelper.ignoreProperty(propName) ) {
+                        properties.put(propName, entry.getValue());
+                    }
+                }
+            }
+
+            properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
+            properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
+            properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
+
+            // put scheduler name and scheduler info
+            properties.put(ResourceHelper.PROPERTY_SCHEDULE_NAME, scheduleName);
+            final String[] infoArray = new String[scheduleInfos.size()];
+            int index = 0;
+            for(final ScheduleInfoImpl info : scheduleInfos) {
+                infoArray[index] = info.getSerializedString();
+                index++;
+            }
+            properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, infoArray);
+            if ( suspend ) {
+                properties.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
+            }
+
+            // create path and resource
+            properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
+
+            final String path = this.configuration.getScheduledJobsPath(true) + ResourceHelper.filterName(scheduleName);
+
+            // update existing resource
+            final Resource existingInfo = resolver.getResource(path);
+            if ( existingInfo != null ) {
+                resolver.delete(existingInfo);
+                logger.debug("Updating scheduled job {} at {}", properties, path);
+            } else {
+                logger.debug("Storing new scheduled job {} at {}", properties, path);
+            }
+            ResourceHelper.getOrCreateResource(resolver,
+                    path,
+                    properties);
+            // put back real schedule infos
+            properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, scheduleInfos);
+
+            return properties;
+        } finally {
+            resolver.close();
+        }
+    }
+
+    private ScheduledJobInfoImpl addOrUpdateScheduledJob(
+            final Map<String, Object> properties,
+            final ScheduledJobInfoImpl oldInfo) {
+        properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+        properties.remove(Job.PROPERTY_JOB_CREATED);
+        properties.remove(Job.PROPERTY_JOB_CREATED_INSTANCE);
+
+        final String jobTopic = (String) properties.remove(ResourceHelper.PROPERTY_JOB_TOPIC);
+        final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULE_NAME);
+
+        final ScheduledJobInfoImpl info;
+        if ( oldInfo == null ) {
+            info = new ScheduledJobInfoImpl(jobScheduler, schedulerName);
+        } else {
+            info = oldInfo;
+        }
+        info.update(jobTopic, properties);
+
+        return info;
+    }
+
+    /**
+     * A bundle event occurred which means we can try loading jobs that previously
+     * failed because of missing classes.
+     */
+    public void bundleEvent() {
+        this.lastBundleActivity.set(System.currentTimeMillis());
+        this.addTask(new Runnable() {
+            @Override
+            public void run() {
+                final Map<String, Holder> updateJobs = new HashMap<String, ScheduledJobHandler.Holder>();
+                synchronized ( scheduledJobs ) {
+                    for(final Map.Entry<String, Holder> entry : scheduledJobs.entrySet()) {
+                        if ( entry.getValue().info == null && entry.getValue().read < lastBundleActivity.get() ) {
+                            updateJobs.put(entry.getKey(), entry.getValue());
+                        }
+                    }
+                }
+                if ( !updateJobs.isEmpty() && isRunning.get() ) {
+                    ResourceResolver resolver = configuration.createResourceResolver();
+                    if ( resolver != null ) {
+                        try {
+                            for(final Map.Entry<String, Holder> entry : updateJobs.entrySet()) {
+                                final String path = configuration.getScheduledJobsPath(true) + entry.getKey();
+                                final Resource rsrc = resolver.getResource(path);
+                                if ( !isRunning.get() ) {
+                                    break;
+                                }
+                                if ( rsrc != null ) {
+                                    synchronized ( scheduledJobs ) {
+                                        handleAddOrUpdate(scheduledJobs, rsrc);
+                                    }
+                                }
+                            }
+                        } finally {
+                            resolver.close();
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Handle observation event for removing a scheduled job
+     * @param path The path to the job
+     */
+    public void handleRemove(final String path) {
+        this.addTask(new Runnable() {
+            @Override
+            public void run() {
+                if ( isRunning.get() ) {
+                    final String scheduleKey = ResourceHelper.filterName(ResourceUtil.getName(path));
+                    if ( scheduleKey != null ) {
+                        synchronized ( scheduledJobs ) {
+                            final Holder h = scheduledJobs.remove(scheduleKey);
+                            if ( h != null && h.info != null ) {
+                                jobScheduler.unscheduleJob(h.info);
+                            }
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Handle observation event for adding or updating a scheduled job
+     * @param path The path to the job
+     */
+    public void handleAddUpdate(final String path) {
+        this.addTask(new Runnable() {
+            @Override
+            public void run() {
+                if ( isRunning.get() ) {
+                    final ResourceResolver resolver = configuration.createResourceResolver();
+                    if ( resolver != null ) {
+                        try {
+                            final Resource rsrc = resolver.getResource(path);
+                            if ( rsrc != null ) {
+                                synchronized ( scheduledJobs ) {
+                                    handleAddOrUpdate(scheduledJobs, rsrc);
+                                }
+                            }
+                        } finally {
+                            resolver.close();
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Handle add or update of a resource
+     * @param newScheduledJobs The map to store the jobs
+     * @param rsrc The resource containing the job
+     */
+    private void handleAddOrUpdate(final Map<String, Holder> newScheduledJobs, final Resource rsrc) {
+        final String id = ResourceHelper.filterName(rsrc.getName());
+        final Holder scheduled = this.scheduledJobs.remove(id);
+        boolean read = false;
+        if ( scheduled != null ) {
+            // check if loading failed and we can retry
+            if ( scheduled.info == null || scheduled.read < this.lastBundleActivity.get() ) {
+                read = true;
+            }
+            // check if this is an update
+            if ( scheduled.info != null ) {
+                final ValueMap vm = ResourceUtil.getValueMap(rsrc);
+                final Calendar changed = (Calendar) vm.get(Job.PROPERTY_JOB_CREATED);
+                if ( changed != null && scheduled.created.compareTo(changed) < 0 ) {
+                    read = true;
+                }
+            }
+            if ( !read ) {
+                // nothing changes
+                newScheduledJobs.put(id, scheduled);
+            }
+        } else {
+            read = true;
+        }
+        if ( read ) {
+            // read
+            final Holder holder = new Holder();
+            holder.read = System.currentTimeMillis();
+
+            final Map<String, Object> properties = this.readScheduledJob(rsrc);
+            if ( properties != null ) {
+                holder.created = (Calendar) properties.get(Job.PROPERTY_JOB_CREATED);
+                holder.info = this.addOrUpdateScheduledJob(properties, scheduled != null ? scheduled.info : null);
+            }
+            newScheduledJobs.put(id, holder);
+
+            if ( holder.info == null && scheduled != null && scheduled.info != null ) {
+                this.jobScheduler.unscheduleJob(scheduled.info);
+            }
+            if ( holder.info != null ) {
+                this.jobScheduler.scheduleJob(holder.info);
+            }
+        }
+    }
+
+    /**
+     * Remove a scheduled job
+     * @param info The schedule info
+     */
+    public void remove(final ScheduledJobInfoImpl info) {
+        final String scheduleKey = ResourceHelper.filterName(info.getName());
+
+        final ResourceResolver resolver = configuration.createResourceResolver();
+        try {
+            final StringBuilder sb = new StringBuilder(configuration.getScheduledJobsPath(true));
+            sb.append(scheduleKey);
+            final String path = sb.toString();
+
+            final Resource eventResource = resolver.getResource(path);
+            if ( eventResource != null ) {
+                resolver.delete(eventResource);
+                resolver.commit();
+            }
+        } catch (final PersistenceException pe) {
+            // we ignore the exception if removing fails
+            ignoreException(pe);
+        } finally {
+            resolver.close();
+        }
+
+        synchronized ( this.scheduledJobs ) {
+            final Holder h = scheduledJobs.remove(scheduleKey);
+            if ( h != null && h.info != null ) {
+                jobScheduler.unscheduleJob(h.info);
+            }
+        }
+    }
+
+    public void updateSchedule(final String scheduleName, final Collection<ScheduleInfo> scheduleInfo) {
+
+        final ResourceResolver resolver = configuration.createResourceResolver();
+        try {
+            final String scheduleKey = ResourceHelper.filterName(scheduleName);
+
+            final StringBuilder sb = new StringBuilder(configuration.getScheduledJobsPath(true));
+            sb.append(scheduleKey);
+            final String path = sb.toString();
+
+            final Resource rsrc = resolver.getResource(path);
+            // This is an update, if we can't find the resource we ignore it
+            if ( rsrc != null ) {
+                final Calendar now = Calendar.getInstance();
+
+                // update holder first
+                synchronized ( scheduledJobs ) {
+                    final Holder h = scheduledJobs.get(scheduleKey);
+                    if ( h != null ) {
+                        h.created = now;
+                    }
+                }
+
+                final ModifiableValueMap mvm = rsrc.adaptTo(ModifiableValueMap.class);
+                mvm.put(Job.PROPERTY_JOB_CREATED, now);
+                final String[] infoArray = new String[scheduleInfo.size()];
+                int index = 0;
+                for(final ScheduleInfo si : scheduleInfo) {
+                    infoArray[index] = ((ScheduleInfoImpl)si).getSerializedString();
+                    index++;
+                }
+                mvm.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, infoArray);
+
+                try {
+                    resolver.commit();
+                } catch ( final PersistenceException pe) {
+                    logger.warn("Unable to update scheduled job " + scheduleName, pe);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e The exception
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java (from r1676922, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java&r1=1676922&r2=1676967&rev=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java Thu Apr 30 12:06:25 2015
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs;
+package org.apache.sling.event.impl.jobs.scheduling;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -26,42 +26,72 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.impl.support.ScheduleInfoImpl;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobBuilder.ScheduleBuilder;
 import org.apache.sling.event.jobs.ScheduleInfo;
 import org.apache.sling.event.jobs.ScheduledJobInfo;
 
+/**
+ * The job schedule information.
+ * It holds all required information like
+ * - the name of the schedule
+ * - the job topic
+ * - the job properties
+ * - scheduling information
+ */
 public class ScheduledJobInfoImpl implements ScheduledJobInfo, Serializable {
 
     private static final long serialVersionUID = 1L;
 
     private final String scheduleName;
 
-    private final String jobTopic;
+    private final JobSchedulerImpl jobScheduler;
 
-    private final Map<String, Object> jobProperties;
+    private final AtomicBoolean isSuspended = new AtomicBoolean(false);
 
-    private final JobSchedulerImpl jobScheduler;
+    private volatile List<ScheduleInfo> scheduleInfos;
 
-    private List<ScheduleInfo> scheduleInfos;
+    private volatile String jobTopic;
 
-    private AtomicBoolean isSuspended;
+    private volatile Map<String, Object> jobProperties;
 
+    /**
+     * Create a new info object
+     * @param jobScheduler The job scheduler
+     * @param scheduleName The unique name
+     */
     public ScheduledJobInfoImpl(final JobSchedulerImpl jobScheduler,
-            final String jobTopic,
-            final Map<String, Object> jobProperties,
             final String scheduleName) {
         this.jobScheduler = jobScheduler;
         this.scheduleName = scheduleName;
+    }
+
+    /**
+     * Update/set the job related information
+     * @param jobTopic      The job topic
+     * @param jobProperties The job properties
+     */
+    public void update(final String jobTopic,
+            final Map<String, Object> jobProperties) {
+        final boolean isSuspended = jobProperties.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED) != null;
+        @SuppressWarnings("unchecked")
+        final List<ScheduleInfo> scheduleInfos = (List<ScheduleInfo>) jobProperties.remove(ResourceHelper.PROPERTY_SCHEDULE_INFO);
+
         this.jobTopic = jobTopic;
         this.jobProperties = jobProperties;
+        this.scheduleInfos = Collections.unmodifiableList(scheduleInfos);
+
+        this.isSuspended.set(isSuspended);
     }
 
-    public void update(final boolean isSuspended,
-            final List<ScheduleInfo> scheduleInfos) {
-        this.scheduleInfos = Collections.unmodifiableList(scheduleInfos);
-        this.isSuspended = new AtomicBoolean(isSuspended);
+    /**
+     * Update the scheduling information
+     * @param scheduleInfos The new schedule
+     */
+    public void update(final List<ScheduleInfo> scheduleInfos) {
+        this.scheduleInfos =  Collections.unmodifiableList(scheduleInfos);
     }
 
     /**
@@ -115,7 +145,7 @@ public class ScheduledJobInfoImpl implem
      */
     @Override
     public void unschedule() {
-        this.jobScheduler.unschedule(this);
+        this.jobScheduler.unscheduleJob(this);
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Thu Apr 30 12:06:25 2015
@@ -146,7 +146,10 @@ public abstract class ResourceHelper {
      * @return The filtered node name.
      */
     public static String filterName(final String resourceName) {
-        final StringBuilder sb  = new StringBuilder(resourceName.length());
+        if ( resourceName == null ) {
+            return null;
+        }
+        final StringBuilder sb = new StringBuilder(resourceName.length());
         char lastAdded = 0;
 
         for(int i=0; i < resourceName.length(); i++) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java Thu Apr 30 12:06:25 2015
@@ -409,4 +409,13 @@ public class ScheduleInfoImpl implements
         }
         return null;
     }
+
+    @Override
+    public String toString() {
+        return "ScheduleInfo [scheduleType=" + scheduleType
+                + ", dayOfWeek=" + dayOfWeek + ", hourOfDay=" + hourOfDay
+                + ", minuteOfHour=" + minuteOfHour + ", at=" + at
+                + ", monthOfYear=" + monthOfYear + ", expression=" + expression
+                + "]";
+    }
 }