You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/04/30 14:06:26 UTC
svn commit: r1676967 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl:
jobs/ jobs/scheduling/ support/
Author: cziegeler
Date: Thu Apr 30 12:06:25 2015
New Revision: 1676967
URL: http://svn.apache.org/r1676967
Log:
SLING-4680 : Decouple scheduled jobs from observation
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java
- copied, changed from r1676922, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java Thu Apr 30 12:06:25 2015
@@ -18,16 +18,13 @@
*/
package org.apache.sling.event.impl.jobs;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.impl.jobs.scheduling.JobScheduleBuilderImpl;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobBuilder;
-import org.apache.sling.event.jobs.ScheduledJobInfo;
/**
* Fluent builder API
@@ -64,81 +61,18 @@ public class JobBuilderImpl implements J
@Override
public ScheduleBuilder schedule() {
- return new ScheduleBuilderImpl(UUID.randomUUID().toString());
+ return new JobScheduleBuilderImpl(
+ this.topic,
+ this.properties,
+ UUID.randomUUID().toString(),
+ this.jobManager.getJobScheduler());
}
public ScheduleBuilder schedule(final String name) {
- return new ScheduleBuilderImpl(name);
- }
-
- public final class ScheduleBuilderImpl implements ScheduleBuilder {
-
- private final String scheduleName;
-
- private boolean suspend = false;
-
- private final List<ScheduleInfoImpl> schedules = new ArrayList<ScheduleInfoImpl>();
-
- public ScheduleBuilderImpl(final String name) {
- this.scheduleName = name;
- }
-
- @Override
- public ScheduleBuilder weekly(final int day, final int hour, final int minute) {
- schedules.add(ScheduleInfoImpl.WEEKLY(day, hour, minute));
- return this;
- }
-
- @Override
- public ScheduleBuilder daily(final int hour, final int minute) {
- schedules.add(ScheduleInfoImpl.DAILY(hour, minute));
- return this;
- }
-
- @Override
- public ScheduleBuilder hourly(final int minute) {
- schedules.add(ScheduleInfoImpl.HOURLY(minute));
- return this;
- }
-
- @Override
- public ScheduleBuilder at(final Date date) {
- schedules.add(ScheduleInfoImpl.AT(date));
- return this;
- }
-
- @Override
- public ScheduleBuilder monthly(final int day, final int hour, final int minute) {
- schedules.add(ScheduleInfoImpl.MONTHLY(day, hour, minute));
- return this;
- }
-
- @Override
- public ScheduleBuilder yearly(final int month, final int day, final int hour, final int minute) {
- schedules.add(ScheduleInfoImpl.YEARLY(month, day, hour, minute));
- return this;
- }
-
- @Override
- public ScheduleBuilder cron(final String expression) {
- schedules.add(ScheduleInfoImpl.CRON(expression));
- return this;
- }
-
- @Override
- public ScheduledJobInfo add() {
- return this.add(null);
- }
-
- @Override
- public ScheduledJobInfo add(final List<String> errors) {
- return jobManager.addScheduledJob(topic, null, properties, scheduleName, suspend, schedules, errors);
- }
-
- @Override
- public ScheduleBuilder suspend() {
- this.suspend = true;
- return this;
- }
+ return new JobScheduleBuilderImpl(
+ this.topic,
+ this.properties,
+ name,
+ this.jobManager.getJobScheduler());
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Apr 30 12:06:25 2015
@@ -49,11 +49,11 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.jobs.queues.JobQueueImpl;
import org.apache.sling.event.impl.jobs.queues.QueueManager;
+import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.impl.jobs.tasks.CleanUpTask;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.impl.support.ScheduleInfoImpl;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Job.JobState;
import org.apache.sling.event.jobs.JobBuilder;
@@ -123,7 +123,7 @@ public class JobManagerImpl
private CleanUpTask maintenanceTask;
/** Job Scheduler. */
- private JobSchedulerImpl jobScheduler;
+ private org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl jobScheduler;
/**
* Activate this component.
@@ -131,7 +131,7 @@ public class JobManagerImpl
*/
@Activate
protected void activate(final Map<String, Object> props) throws LoginException {
- this.jobScheduler = new JobSchedulerImpl(this.configuration, this.scheduler, this);
+ this.jobScheduler = new org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl(this.configuration, this.scheduler, this);
this.maintenanceTask = new CleanUpTask(this.configuration);
logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID);
@@ -903,49 +903,6 @@ public class JobManagerImpl
return this.jobScheduler.getScheduledJobs(topic, limit, templates);
}
- public ScheduledJobInfo addScheduledJob(final String topic,
- final String jobName,
- final Map<String, Object> properties,
- final String scheduleName,
- final boolean isSuspended,
- final List<ScheduleInfoImpl> scheduleInfos,
- final List<String> errors) {
- final List<String> msgs = new ArrayList<String>();
- if ( scheduleName == null || scheduleName.length() == 0 ) {
- msgs.add("Schedule name not specified");
- }
- final String errorMessage = Utility.checkJob(topic, properties);
- if ( errorMessage != null ) {
- msgs.add(errorMessage);
- }
- if ( scheduleInfos.size() == 0 ) {
- msgs.add("No schedule defined for " + scheduleName);
- }
- for(final ScheduleInfoImpl info : scheduleInfos) {
- info.check(msgs);
- }
- if ( msgs.size() == 0 ) {
- try {
- final ScheduledJobInfo info = this.jobScheduler.writeJob(topic, jobName, properties, scheduleName, isSuspended, scheduleInfos);
- if ( info != null ) {
- return info;
- }
- msgs.add("Unable to persist scheduled job.");
- } catch ( final PersistenceException pe) {
- msgs.add("Unable to persist scheduled job: " + scheduleName);
- logger.warn("Unable to persist scheduled job", pe);
- }
- } else {
- for(final String msg : msgs) {
- logger.warn(msg);
- }
- }
- if ( errors != null ) {
- errors.addAll(msgs);
- }
- return null;
- }
-
/**
* Internal method to add a job
*/
@@ -1017,4 +974,8 @@ public class JobManagerImpl
}
return null;
}
+
+ public JobSchedulerImpl getJobScheduler() {
+ return this.jobScheduler;
+ }
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java?rev=1676967&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java Thu Apr 30 12:06:25 2015
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.scheduling;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.jobs.JobBuilder.ScheduleBuilder;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+
+/**
+ * The builder implementation for scheduled jobs.
+ */
+public final class JobScheduleBuilderImpl implements ScheduleBuilder {
+
+ private final String topic;
+
+ private final Map<String, Object> properties;
+
+ private final String scheduleName;
+
+ private final JobSchedulerImpl jobScheduler;
+
+ private volatile boolean suspend = false;
+
+ private final List<ScheduleInfoImpl> schedules = new ArrayList<ScheduleInfoImpl>();
+
+ public JobScheduleBuilderImpl(
+ final String topic,
+ final Map<String, Object> properties,
+ final String name,
+ final JobSchedulerImpl jobScheduler) {
+ this.topic = topic;
+ this.properties = properties;
+ this.scheduleName = name;
+ this.jobScheduler = jobScheduler;
+ }
+
+ @Override
+ public ScheduleBuilder weekly(final int day, final int hour, final int minute) {
+ schedules.add(ScheduleInfoImpl.WEEKLY(day, hour, minute));
+ return this;
+ }
+
+ @Override
+ public ScheduleBuilder daily(final int hour, final int minute) {
+ schedules.add(ScheduleInfoImpl.DAILY(hour, minute));
+ return this;
+ }
+
+ @Override
+ public ScheduleBuilder hourly(final int minute) {
+ schedules.add(ScheduleInfoImpl.HOURLY(minute));
+ return this;
+ }
+
+ @Override
+ public ScheduleBuilder at(final Date date) {
+ schedules.add(ScheduleInfoImpl.AT(date));
+ return this;
+ }
+
+ @Override
+ public ScheduleBuilder monthly(final int day, final int hour, final int minute) {
+ schedules.add(ScheduleInfoImpl.MONTHLY(day, hour, minute));
+ return this;
+ }
+
+ @Override
+ public ScheduleBuilder yearly(final int month, final int day, final int hour, final int minute) {
+ schedules.add(ScheduleInfoImpl.YEARLY(month, day, hour, minute));
+ return this;
+ }
+
+ @Override
+ public ScheduleBuilder cron(final String expression) {
+ schedules.add(ScheduleInfoImpl.CRON(expression));
+ return this;
+ }
+
+ @Override
+ public ScheduledJobInfo add() {
+ return this.add(null);
+ }
+
+ @Override
+ public ScheduledJobInfo add(final List<String> errors) {
+ return this.jobScheduler.addScheduledJob(topic,
+ properties,
+ scheduleName,
+ suspend,
+ schedules,
+ errors);
+ }
+
+ @Override
+ public ScheduleBuilder suspend() {
+ this.suspend = true;
+ return this;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobScheduleBuilderImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java?rev=1676967&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java Thu Apr 30 12:06:25 2015
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.scheduling;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.scheduler.JobContext;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.jobs.JobBuilder;
+import org.apache.sling.event.jobs.ScheduleInfo;
+import org.apache.sling.event.jobs.ScheduleInfo.ScheduleType;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The scheduler for managing scheduled jobs.
+ *
+ * This is not a component by itself, it's directly created from the job manager.
+ * The job manager is also registering itself as an event handler and forwards
+ * the events to this service.
+ */
+public class JobSchedulerImpl
+ implements EventHandler,
+ ConfigurationChangeListener,
+ org.apache.sling.commons.scheduler.Job {
+
+ private static final String PROPERTY_READ_JOB = "properties";
+
+ private static final String PROPERTY_SCHEDULE_INDEX = "index";
+
+ /** Default logger */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Is this active? */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /** Central job handling configuration. */
+ private final JobManagerConfiguration configuration;
+
+ /** Scheduler service. */
+ private final Scheduler scheduler;
+
+ /** Job manager. */
+ private final JobManagerImpl jobManager;
+
+ /** Scheduled job handler. */
+ private final ScheduledJobHandler scheduledJobHandler;
+
+ /** All scheduled jobs, by scheduler name */
+ private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<String, ScheduledJobInfoImpl>();
+
+ /**
+ * Create the scheduler
+ * @param configuration Central job manager configuration
+ * @param scheduler The scheduler service
+ * @param jobManager The job manager
+ */
+ public JobSchedulerImpl(final JobManagerConfiguration configuration,
+ final Scheduler scheduler,
+ final JobManagerImpl jobManager) {
+ this.configuration = configuration;
+ this.scheduler = scheduler;
+ this.jobManager = jobManager;
+
+ this.configuration.addListener(this);
+
+ this.scheduledJobHandler = new ScheduledJobHandler(configuration, this);
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ public void deactivate() {
+ this.configuration.removeListener(this);
+
+ this.scheduledJobHandler.deactivate();
+
+ if ( this.active.compareAndSet(true, false) ) {
+ this.stopScheduling();
+ }
+ synchronized ( this.scheduledJobs ) {
+ this.scheduledJobs.clear();
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener#configurationChanged(boolean)
+ */
+ @Override
+ public void configurationChanged(final boolean processingActive) {
+ // scheduling is only active if
+ // - processing is active and
+ // - configuration is still available and active
+ // - and current instance is leader
+ final boolean schedulingActive;
+ if ( processingActive ) {
+ final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
+ if ( caps != null && caps.isActive() ) {
+ schedulingActive = caps.isLeader();
+ } else {
+ schedulingActive = false;
+ }
+ } else {
+ schedulingActive = false;
+ }
+
+ // switch activation based on current state and new state
+ if ( schedulingActive ) {
+ // activate if inactive
+ if ( this.active.compareAndSet(false, true) ) {
+ this.startScheduling();
+ }
+ } else {
+ // deactivate if active
+ if ( this.active.compareAndSet(true, false) ) {
+ this.stopScheduling();
+ }
+ }
+ }
+
+ /**
+ * Start all scheduled jobs
+ */
+ private void startScheduling() {
+ synchronized ( this.scheduledJobs ) {
+ for(final ScheduledJobInfo info : this.scheduledJobs.values()) {
+ this.startScheduledJob(((ScheduledJobInfoImpl)info));
+ }
+ }
+ }
+
+ /**
+ * Stop all scheduled jobs.
+ */
+ private void stopScheduling() {
+ synchronized ( this.scheduledJobs ) {
+ for(final ScheduledJobInfo info : this.scheduledJobs.values()) {
+ this.stopScheduledJob((ScheduledJobInfoImpl)info);
+ }
+ }
+ }
+
+ /**
+ * Add a scheduled job
+ */
+ public void scheduleJob(final ScheduledJobInfoImpl info) {
+ synchronized ( this.scheduledJobs ) {
+ this.scheduledJobs.put(info.getName(), info);
+ this.startScheduledJob(info);
+ }
+ }
+
+ /**
+ * Remove a scheduled job
+ */
+ public void unscheduleJob(final ScheduledJobInfoImpl info) {
+ synchronized ( this.scheduledJobs ) {
+ if ( this.scheduledJobs.remove(info.getName()) != null ) {
+ this.stopScheduledJob(info);
+ }
+ }
+ }
+
+ /**
+ * Start a scheduled job
+ * @param info The scheduling info
+ */
+ private void startScheduledJob(final ScheduledJobInfoImpl info) {
+ if ( this.active.get() ) {
+ if ( !info.isSuspended() ) {
+ this.configuration.getAuditLogger().debug("SCHEDULED OK name={}, topic={}, properties={} : {}",
+ new Object[] {info.getName(),
+ info.getJobTopic(),
+ info.getJobProperties()},
+ info.getSchedules());
+ int index = 0;
+ for(final ScheduleInfo si : info.getSchedules()) {
+ final String name = info.getSchedulerJobId() + "-" + String.valueOf(index);
+ ScheduleOptions options = null;
+ switch ( si.getType() ) {
+ case DAILY:
+ case WEEKLY:
+ case HOURLY:
+ case MONTHLY:
+ case YEARLY:
+ case CRON:
+ options = this.scheduler.EXPR(((ScheduleInfoImpl)si).getCronExpression());
+
+ break;
+ case DATE:
+ options = this.scheduler.AT(((ScheduleInfoImpl)si).getNextScheduledExecution());
+ break;
+ }
+ // Create configuration for scheduled job
+ final Map<String, Serializable> config = new HashMap<String, Serializable>();
+ config.put(PROPERTY_READ_JOB, info);
+ config.put(PROPERTY_SCHEDULE_INDEX, index);
+ this.scheduler.schedule(this, options.name(name).config(config).canRunConcurrently(false));
+ index++;
+ }
+ } else {
+ this.configuration.getAuditLogger().debug("SCHEDULED SUSPENDED name={}, topic={}, properties={} : {}",
+ new Object[] {info.getName(),
+ info.getJobTopic(),
+ info.getJobProperties(),
+ info.getSchedules()});
+ }
+ }
+ }
+
+ /**
+ * Stop a scheduled job
+ * @param info The scheduling info
+ */
+ private void stopScheduledJob(final ScheduledJobInfoImpl info) {
+ if ( this.active.get() ) {
+ this.configuration.getAuditLogger().debug("SCHEDULED STOP name={}, topic={}, properties={} : {}",
+ new Object[] {info.getName(),
+ info.getJobTopic(),
+ info.getJobProperties(),
+ info.getSchedules()});
+ for(int index = 0; index<info.getSchedules().size(); index++) {
+ final String name = info.getSchedulerJobId() + "-" + String.valueOf(index);
+ this.scheduler.unschedule(name);
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
+ */
+ @Override
+ public void execute(final JobContext context) {
+ final ScheduledJobInfoImpl info = (ScheduledJobInfoImpl) context.getConfiguration().get(PROPERTY_READ_JOB);
+
+ if ( info.isSuspended() ) {
+ return;
+ }
+
+ this.jobManager.addJob(info.getJobTopic(), info.getJobProperties());
+ final int index = (Integer)context.getConfiguration().get(PROPERTY_SCHEDULE_INDEX);
+ final Iterator<ScheduleInfo> iter = info.getSchedules().iterator();
+ ScheduleInfo si = iter.next();
+ for(int i=0; i<index; i++) {
+ si = iter.next();
+ }
+ // if scheduled once (DATE), remove from schedule
+ if ( si.getType() == ScheduleType.DATE ) {
+ if ( index == 0 && info.getSchedules().size() == 1 ) {
+ // remove
+ this.scheduledJobHandler.remove(info);
+ } else {
+ // update schedule list
+ final List<ScheduleInfo> infos = new ArrayList<ScheduleInfo>();
+ for(final ScheduleInfo i : info.getSchedules() ) {
+ if ( i != si ) { // no need to use equals
+ infos.add(i);
+ }
+ }
+ info.update(infos);
+ this.scheduledJobHandler.updateSchedule(info.getName(), infos);
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ @Override
+ public void handleEvent(final Event event) {
+ if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
+ || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
+ this.scheduledJobHandler.bundleEvent();
+ } else {
+ // resource event
+ final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
+ if ( path != null && path.startsWith(this.configuration.getScheduledJobsPath(true)) ) {
+ if ( SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ) {
+ // removal
+ this.scheduledJobHandler.handleRemove(path);
+ } else {
+ // add or update
+ this.scheduledJobHandler.handleAddUpdate(path);
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Create a schedule builder for a currently scheduled job
+ */
+ public JobBuilder.ScheduleBuilder createJobBuilder(final ScheduledJobInfoImpl info) {
+ final JobBuilder.ScheduleBuilder sb = new JobScheduleBuilderImpl(info.getJobTopic(),
+ info.getJobProperties(), info.getName(), this);
+ return (info.isSuspended() ? sb.suspend() : sb);
+ }
+
+ private enum Operation {
+ LESS,
+ LESS_OR_EQUALS,
+ EQUALS,
+ GREATER_OR_EQUALS,
+ GREATER
+ }
+
+ /**
+ * Check if the job matches the template
+ */
+ private boolean match(final ScheduledJobInfoImpl job, final Map<String, Object> template) {
+ if ( template != null ) {
+ for(final Map.Entry<String, Object> current : template.entrySet()) {
+ final String key = current.getKey();
+ final char firstChar = key.length() > 0 ? key.charAt(0) : 0;
+ final String propName;
+ final Operation op;
+ if ( firstChar == '=' ) {
+ propName = key.substring(1);
+ op = Operation.EQUALS;
+ } else if ( firstChar == '<' ) {
+ final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+ if ( secondChar == '=' ) {
+ op = Operation.LESS_OR_EQUALS;
+ propName = key.substring(2);
+ } else {
+ op = Operation.LESS;
+ propName = key.substring(1);
+ }
+ } else if ( firstChar == '>' ) {
+ final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
+ if ( secondChar == '=' ) {
+ op = Operation.GREATER_OR_EQUALS;
+ propName = key.substring(2);
+ } else {
+ op = Operation.GREATER;
+ propName = key.substring(1);
+ }
+ } else {
+ propName = key;
+ op = Operation.EQUALS;
+ }
+ final Object value = current.getValue();
+
+ if ( op == Operation.EQUALS ) {
+ if ( !value.equals(job.getJobProperties().get(propName)) ) {
+ return false;
+ }
+ } else {
+ if ( value instanceof Comparable ) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final int result = ((Comparable)value).compareTo(job.getJobProperties().get(propName));
+ if ( op == Operation.LESS && result > -1 ) {
+ return false;
+ } else if ( op == Operation.LESS_OR_EQUALS && result > 0 ) {
+ return false;
+ } else if ( op == Operation.GREATER_OR_EQUALS && result < 0 ) {
+ return false;
+ } else if ( op == Operation.GREATER && result < 1 ) {
+ return false;
+ }
+ } else {
+ // if the value is not comparable we simply don't match
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get all scheduled jobs
+ */
+ public Collection<ScheduledJobInfo> getScheduledJobs(final String topic,
+ final long limit,
+ final Map<String, Object>... templates) {
+ final List<ScheduledJobInfo> jobs = new ArrayList<ScheduledJobInfo>();
+ long count = 0;
+ synchronized ( this.scheduledJobs ) {
+ for(final ScheduledJobInfoImpl job : this.scheduledJobs.values() ) {
+ boolean add = true;
+ if ( topic != null && !topic.equals(job.getJobTopic()) ) {
+ add = false;
+ }
+ if ( add && templates != null && templates.length != 0 ) {
+ add = false;
+ for (Map<String,Object> template : templates) {
+ add = this.match(job, template);
+ if ( add ) {
+ break;
+ }
+ }
+ }
+ if ( add ) {
+ jobs.add(job);
+ count++;
+ if ( limit > 0 && count == limit ) {
+ break;
+ }
+ }
+ }
+ }
+ return jobs;
+ }
+
+ /**
+ * Change the suspended flag for a scheduled job
+ * @param info The schedule info
+ * @param flag The corresponding flag
+ */
+ public void setSuspended(final ScheduledJobInfoImpl info, final boolean flag) {
+ final ResourceResolver resolver = configuration.createResourceResolver();
+ try {
+ final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true));
+ sb.append(ResourceHelper.filterName(info.getName()));
+ final String path = sb.toString();
+
+ final Resource eventResource = resolver.getResource(path);
+ if ( eventResource != null ) {
+ final ModifiableValueMap mvm = eventResource.adaptTo(ModifiableValueMap.class);
+ if ( flag ) {
+ mvm.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
+ } else {
+ mvm.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED);
+ }
+ resolver.commit();
+ }
+ if ( flag ) {
+ this.stopScheduledJob(info);
+ } else {
+ this.startScheduledJob(info);
+ }
+ } catch (final PersistenceException pe) {
+ // we ignore the exception if removing fails
+ ignoreException(pe);
+ } finally {
+ resolver.close();
+ }
+ }
+
+ /**
+ * Add a scheduled job
+ * @param topic The job topic
+ * @param properties The job properties
+ * @param scheduleName The schedule name
+ * @param isSuspended Whether it is suspended
+ * @param scheduleInfos The scheduling information
+ * @param errors Optional list to contain potential errors
+ * @return A new job info or {@code null}
+ */
+ public ScheduledJobInfo addScheduledJob(final String topic,
+ final Map<String, Object> properties,
+ final String scheduleName,
+ final boolean isSuspended,
+ final List<ScheduleInfoImpl> scheduleInfos,
+ final List<String> errors) {
+ final List<String> msgs = new ArrayList<String>();
+ if ( scheduleName == null || scheduleName.length() == 0 ) {
+ msgs.add("Schedule name not specified");
+ }
+ final String errorMessage = Utility.checkJob(topic, properties);
+ if ( errorMessage != null ) {
+ msgs.add(errorMessage);
+ }
+ if ( scheduleInfos.size() == 0 ) {
+ msgs.add("No schedule defined for " + scheduleName);
+ }
+ for(final ScheduleInfoImpl info : scheduleInfos) {
+ info.check(msgs);
+ }
+ if ( msgs.size() == 0 ) {
+ try {
+ final ScheduledJobInfo info = this.scheduledJobHandler.addOrUpdateJob(topic, properties, scheduleName, isSuspended, scheduleInfos);
+ if ( info != null ) {
+ return info;
+ }
+ msgs.add("Unable to persist scheduled job.");
+ } catch ( final PersistenceException pe) {
+ msgs.add("Unable to persist scheduled job: " + scheduleName);
+ logger.warn("Unable to persist scheduled job", pe);
+ }
+ } else {
+ for(final String msg : msgs) {
+ logger.warn(msg);
+ }
+ }
+ if ( errors != null ) {
+ errors.addAll(msgs);
+ }
+ return null;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java?rev=1676967&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java Thu Apr 30 12:06:25 2015
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.scheduling;
+
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.impl.support.ScheduleInfoImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.ScheduleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ScheduledJobHandler implements Runnable {
+
+ public static final class Holder {
+ public Calendar created;
+ public ScheduledJobInfoImpl info;
+ public long read;
+ }
+
+ /** Logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** The job manager configuration. */
+ private final JobManagerConfiguration configuration;
+
+ /** The job scheduler. */
+ private final JobSchedulerImpl jobScheduler;
+
+ /** The map of all scheduled jobs, key is the filtered schedule name */
+ private final Map<String, Holder> scheduledJobs = new HashMap<String, Holder>();
+
+ private final AtomicLong lastBundleActivity = new AtomicLong();
+
+ private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
+ /** A local queue for serializing the event processing. */
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+ /**
+ * @param configuration Current job manager configuration
+ */
+ public ScheduledJobHandler(final JobManagerConfiguration configuration,
+ final JobSchedulerImpl jobScheduler) {
+ this.configuration = configuration;
+ this.jobScheduler = jobScheduler;
+ final Thread t = new Thread(this, "Apache Sling Scheduled Job Handler Thread");
+ t.setDaemon(true);
+ t.start();
+
+ this.addFullScan();
+ }
+
+ /**
+ * Add a task/runnable to the queue
+ */
+ private void addTask(final Runnable r) {
+ try {
+ this.queue.put(r);
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ /**
+ * Add a full scan to the task queue
+ */
+ private void addFullScan() {
+ this.addTask(new Runnable() {
+ @Override
+ public void run() {
+ scan();
+ }
+ });
+ }
+
+ public void deactivate() {
+ this.isRunning.set(false);
+ this.queue.clear();
+ // put a NOP runnable to wake up the queue
+ this.addTask(new Runnable() {
+ @Override
+ public void run() {
+ // do nothing
+ }
+ });
+ }
+
+ @Override
+ public void run() {
+ while ( this.isRunning.get() ) {
+ Runnable r = null;
+ try {
+ r = this.queue.take();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ this.isRunning.set(false);
+ }
+ if ( this.isRunning.get() && r != null) {
+ r.run();
+ }
+ }
+ }
+
+ private void scan() {
+ final ResourceResolver resolver = configuration.createResourceResolver();
+ if ( resolver != null ) {
+ try {
+ logger.debug("Scanning for scheduled jobs...");
+ final String path = this.configuration.getScheduledJobsPath(false);
+ final Resource startResource = resolver.getResource(path);
+ if ( startResource != null ) {
+ final Map<String, Holder> newScheduledJobs = new HashMap<String, Holder>();
+ synchronized ( this.scheduledJobs ) {
+ for(final Resource rsrc : startResource.getChildren()) {
+ if ( !isRunning.get() ) {
+ break;
+ }
+ handleAddOrUpdate(newScheduledJobs, rsrc);
+ }
+ if ( isRunning.get() ) {
+ for(final Holder h : this.scheduledJobs.values()) {
+ if ( h.info != null ) {
+ this.jobScheduler.unscheduleJob(h.info);
+ }
+ }
+ this.scheduledJobs.clear();
+ this.scheduledJobs.putAll(newScheduledJobs);
+ }
+ }
+ }
+ logger.debug("Finished scanning for scheduled jobs...");
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * Read a scheduled job from the resource
+ * @return The job or <code>null</code>
+ */
+ private Map<String, Object> readScheduledJob(final Resource eventResource) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(eventResource);
+ final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+
+ @SuppressWarnings("unchecked")
+ final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
+ if ( readErrorList != null ) {
+ for(final Exception e : readErrorList) {
+ logger.warn("Unable to read scheduled job from " + eventResource.getPath(), e);
+ }
+ } else {
+ return properties;
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.ignoreException(ie);
+ }
+ return null;
+ }
+
+ /**
+ * Write a scheduled job to the resource tree.
+ * @throws PersistenceException
+ */
+ public ScheduledJobInfoImpl addOrUpdateJob(
+ final String jobTopic,
+ final Map<String, Object> jobProperties,
+ final String scheduleName,
+ final boolean suspend,
+ final List<ScheduleInfoImpl> scheduleInfos)
+ throws PersistenceException {
+ final Map<String, Object> properties = this.writeScheduledJob(jobTopic, jobProperties, scheduleName, suspend, scheduleInfos);
+
+ final String key = ResourceHelper.filterName(scheduleName);
+ synchronized ( this.scheduledJobs ) {
+ final Holder h = this.scheduledJobs.remove(key);
+ if ( h != null && h.info != null ) {
+ this.jobScheduler.unscheduleJob(h.info);
+ }
+ final Holder holder = new Holder();
+ holder.created = (Calendar) properties.get(Job.PROPERTY_JOB_CREATED);
+ holder.read = System.currentTimeMillis();
+ holder.info = this.addOrUpdateScheduledJob(properties, h == null ? null : h.info);
+
+ this.jobScheduler.scheduleJob(holder.info);
+ return holder.info;
+ }
+ }
+
+ private Map<String, Object> writeScheduledJob(final String jobTopic,
+ final Map<String, Object> jobProperties,
+ final String scheduleName,
+ final boolean suspend,
+ final List<ScheduleInfoImpl> scheduleInfos)
+ throws PersistenceException {
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ // create properties
+ final Map<String, Object> properties = new HashMap<String, Object>();
+
+ if ( jobProperties != null ) {
+ for(final Map.Entry<String, Object> entry : jobProperties.entrySet() ) {
+ final String propName = entry.getKey();
+ if ( !ResourceHelper.ignoreProperty(propName) ) {
+ properties.put(propName, entry.getValue());
+ }
+ }
+ }
+
+ properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
+ properties.put(Job.PROPERTY_JOB_CREATED, Calendar.getInstance());
+ properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
+
+ // put scheduler name and scheduler info
+ properties.put(ResourceHelper.PROPERTY_SCHEDULE_NAME, scheduleName);
+ final String[] infoArray = new String[scheduleInfos.size()];
+ int index = 0;
+ for(final ScheduleInfoImpl info : scheduleInfos) {
+ infoArray[index] = info.getSerializedString();
+ index++;
+ }
+ properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, infoArray);
+ if ( suspend ) {
+ properties.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
+ }
+
+ // create path and resource
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
+
+ final String path = this.configuration.getScheduledJobsPath(true) + ResourceHelper.filterName(scheduleName);
+
+ // update existing resource
+ final Resource existingInfo = resolver.getResource(path);
+ if ( existingInfo != null ) {
+ resolver.delete(existingInfo);
+ logger.debug("Updating scheduled job {} at {}", properties, path);
+ } else {
+ logger.debug("Storing new scheduled job {} at {}", properties, path);
+ }
+ ResourceHelper.getOrCreateResource(resolver,
+ path,
+ properties);
+ // put back real schedule infos
+ properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, scheduleInfos);
+
+ return properties;
+ } finally {
+ resolver.close();
+ }
+ }
+
+ private ScheduledJobInfoImpl addOrUpdateScheduledJob(
+ final Map<String, Object> properties,
+ final ScheduledJobInfoImpl oldInfo) {
+ properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+ properties.remove(Job.PROPERTY_JOB_CREATED);
+ properties.remove(Job.PROPERTY_JOB_CREATED_INSTANCE);
+
+ final String jobTopic = (String) properties.remove(ResourceHelper.PROPERTY_JOB_TOPIC);
+ final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULE_NAME);
+
+ final ScheduledJobInfoImpl info;
+ if ( oldInfo == null ) {
+ info = new ScheduledJobInfoImpl(jobScheduler, schedulerName);
+ } else {
+ info = oldInfo;
+ }
+ info.update(jobTopic, properties);
+
+ return info;
+ }
+
+ /**
+ * A bundle event occurred which means we can try loading jobs that previously
+ * failed because of missing classes.
+ */
+ public void bundleEvent() {
+ this.lastBundleActivity.set(System.currentTimeMillis());
+ this.addTask(new Runnable() {
+ @Override
+ public void run() {
+ final Map<String, Holder> updateJobs = new HashMap<String, ScheduledJobHandler.Holder>();
+ synchronized ( scheduledJobs ) {
+ for(final Map.Entry<String, Holder> entry : scheduledJobs.entrySet()) {
+ if ( entry.getValue().info == null && entry.getValue().read < lastBundleActivity.get() ) {
+ updateJobs.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ if ( !updateJobs.isEmpty() && isRunning.get() ) {
+ ResourceResolver resolver = configuration.createResourceResolver();
+ if ( resolver != null ) {
+ try {
+ for(final Map.Entry<String, Holder> entry : updateJobs.entrySet()) {
+ final String path = configuration.getScheduledJobsPath(true) + entry.getKey();
+ final Resource rsrc = resolver.getResource(path);
+ if ( !isRunning.get() ) {
+ break;
+ }
+ if ( rsrc != null ) {
+ synchronized ( scheduledJobs ) {
+ handleAddOrUpdate(scheduledJobs, rsrc);
+ }
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Handle observation event for removing a scheduled job
+ * @param path The path to the job
+ */
+ public void handleRemove(final String path) {
+ this.addTask(new Runnable() {
+ @Override
+ public void run() {
+ if ( isRunning.get() ) {
+ final String scheduleKey = ResourceHelper.filterName(ResourceUtil.getName(path));
+ if ( scheduleKey != null ) {
+ synchronized ( scheduledJobs ) {
+ final Holder h = scheduledJobs.remove(scheduleKey);
+ if ( h != null && h.info != null ) {
+ jobScheduler.unscheduleJob(h.info);
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Handle observation event for adding or updating a scheduled job
+ * @param path The path to the job
+ */
+ public void handleAddUpdate(final String path) {
+ this.addTask(new Runnable() {
+ @Override
+ public void run() {
+ if ( isRunning.get() ) {
+ final ResourceResolver resolver = configuration.createResourceResolver();
+ if ( resolver != null ) {
+ try {
+ final Resource rsrc = resolver.getResource(path);
+ if ( rsrc != null ) {
+ synchronized ( scheduledJobs ) {
+ handleAddOrUpdate(scheduledJobs, rsrc);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Handle add or update of a resource
+ * @param newScheduledJobs The map to store the jobs
+ * @param rsrc The resource containing the job
+ */
+ private void handleAddOrUpdate(final Map<String, Holder> newScheduledJobs, final Resource rsrc) {
+ final String id = ResourceHelper.filterName(rsrc.getName());
+ final Holder scheduled = this.scheduledJobs.remove(id);
+ boolean read = false;
+ if ( scheduled != null ) {
+ // check if loading failed and we can retry
+ if ( scheduled.info == null || scheduled.read < this.lastBundleActivity.get() ) {
+ read = true;
+ }
+ // check if this is an update
+ if ( scheduled.info != null ) {
+ final ValueMap vm = ResourceUtil.getValueMap(rsrc);
+ final Calendar changed = (Calendar) vm.get(Job.PROPERTY_JOB_CREATED);
+ if ( changed != null && scheduled.created.compareTo(changed) < 0 ) {
+ read = true;
+ }
+ }
+ if ( !read ) {
+ // nothing changes
+ newScheduledJobs.put(id, scheduled);
+ }
+ } else {
+ read = true;
+ }
+ if ( read ) {
+ // read
+ final Holder holder = new Holder();
+ holder.read = System.currentTimeMillis();
+
+ final Map<String, Object> properties = this.readScheduledJob(rsrc);
+ if ( properties != null ) {
+ holder.created = (Calendar) properties.get(Job.PROPERTY_JOB_CREATED);
+ holder.info = this.addOrUpdateScheduledJob(properties, scheduled != null ? scheduled.info : null);
+ }
+ newScheduledJobs.put(id, holder);
+
+ if ( holder.info == null && scheduled != null && scheduled.info != null ) {
+ this.jobScheduler.unscheduleJob(scheduled.info);
+ }
+ if ( holder.info != null ) {
+ this.jobScheduler.scheduleJob(holder.info);
+ }
+ }
+ }
+
+ /**
+ * Remove a scheduled job
+ * @param info The schedule info
+ */
+ public void remove(final ScheduledJobInfoImpl info) {
+ final String scheduleKey = ResourceHelper.filterName(info.getName());
+
+ final ResourceResolver resolver = configuration.createResourceResolver();
+ try {
+ final StringBuilder sb = new StringBuilder(configuration.getScheduledJobsPath(true));
+ sb.append(scheduleKey);
+ final String path = sb.toString();
+
+ final Resource eventResource = resolver.getResource(path);
+ if ( eventResource != null ) {
+ resolver.delete(eventResource);
+ resolver.commit();
+ }
+ } catch (final PersistenceException pe) {
+ // we ignore the exception if removing fails
+ ignoreException(pe);
+ } finally {
+ resolver.close();
+ }
+
+ synchronized ( this.scheduledJobs ) {
+ final Holder h = scheduledJobs.remove(scheduleKey);
+ if ( h != null && h.info != null ) {
+ jobScheduler.unscheduleJob(h.info);
+ }
+ }
+ }
+
+ public void updateSchedule(final String scheduleName, final Collection<ScheduleInfo> scheduleInfo) {
+
+ final ResourceResolver resolver = configuration.createResourceResolver();
+ try {
+ final String scheduleKey = ResourceHelper.filterName(scheduleName);
+
+ final StringBuilder sb = new StringBuilder(configuration.getScheduledJobsPath(true));
+ sb.append(scheduleKey);
+ final String path = sb.toString();
+
+ final Resource rsrc = resolver.getResource(path);
+ // This is an update, if we can't find the resource we ignore it
+ if ( rsrc != null ) {
+ final Calendar now = Calendar.getInstance();
+
+ // update holder first
+ synchronized ( scheduledJobs ) {
+ final Holder h = scheduledJobs.get(scheduleKey);
+ if ( h != null ) {
+ h.created = now;
+ }
+ }
+
+ final ModifiableValueMap mvm = rsrc.adaptTo(ModifiableValueMap.class);
+ mvm.put(Job.PROPERTY_JOB_CREATED, now);
+ final String[] infoArray = new String[scheduleInfo.size()];
+ int index = 0;
+ for(final ScheduleInfo si : scheduleInfo) {
+ infoArray[index] = ((ScheduleInfoImpl)si).getSerializedString();
+ index++;
+ }
+ mvm.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, infoArray);
+
+ try {
+ resolver.commit();
+ } catch ( final PersistenceException pe) {
+ logger.warn("Unable to update scheduled job " + scheduleName, pe);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e The exception
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java (from r1676922, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java&r1=1676922&r2=1676967&rev=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/ScheduledJobInfoImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobInfoImpl.java Thu Apr 30 12:06:25 2015
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.event.impl.jobs;
+package org.apache.sling.event.impl.jobs.scheduling;
import java.io.Serializable;
import java.util.Collection;
@@ -26,42 +26,72 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.impl.support.ScheduleInfoImpl;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobBuilder.ScheduleBuilder;
import org.apache.sling.event.jobs.ScheduleInfo;
import org.apache.sling.event.jobs.ScheduledJobInfo;
+/**
+ * The job schedule information.
+ * It holds all required information like
+ * - the name of the schedule
+ * - the job topic
+ * - the job properties
+ * - scheduling information
+ */
public class ScheduledJobInfoImpl implements ScheduledJobInfo, Serializable {
private static final long serialVersionUID = 1L;
private final String scheduleName;
- private final String jobTopic;
+ private final JobSchedulerImpl jobScheduler;
- private final Map<String, Object> jobProperties;
+ private final AtomicBoolean isSuspended = new AtomicBoolean(false);
- private final JobSchedulerImpl jobScheduler;
+ private volatile List<ScheduleInfo> scheduleInfos;
- private List<ScheduleInfo> scheduleInfos;
+ private volatile String jobTopic;
- private AtomicBoolean isSuspended;
+ private volatile Map<String, Object> jobProperties;
+ /**
+ * Create a new info object
+ * @param jobScheduler The job scheduler
+ * @param scheduleName The unique name
+ */
public ScheduledJobInfoImpl(final JobSchedulerImpl jobScheduler,
- final String jobTopic,
- final Map<String, Object> jobProperties,
final String scheduleName) {
this.jobScheduler = jobScheduler;
this.scheduleName = scheduleName;
+ }
+
+ /**
+ * Update/set the job related information
+ * @param jobTopic The job topic
+ * @param jobProperties The job properties
+ */
+ public void update(final String jobTopic,
+ final Map<String, Object> jobProperties) {
+ final boolean isSuspended = jobProperties.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED) != null;
+ @SuppressWarnings("unchecked")
+ final List<ScheduleInfo> scheduleInfos = (List<ScheduleInfo>) jobProperties.remove(ResourceHelper.PROPERTY_SCHEDULE_INFO);
+
this.jobTopic = jobTopic;
this.jobProperties = jobProperties;
+ this.scheduleInfos = Collections.unmodifiableList(scheduleInfos);
+
+ this.isSuspended.set(isSuspended);
}
- public void update(final boolean isSuspended,
- final List<ScheduleInfo> scheduleInfos) {
- this.scheduleInfos = Collections.unmodifiableList(scheduleInfos);
- this.isSuspended = new AtomicBoolean(isSuspended);
+ /**
+ * Update the scheduling information
+ * @param scheduleInfos The new schedule
+ */
+ public void update(final List<ScheduleInfo> scheduleInfos) {
+ this.scheduleInfos = Collections.unmodifiableList(scheduleInfos);
}
/**
@@ -115,7 +145,7 @@ public class ScheduledJobInfoImpl implem
*/
@Override
public void unschedule() {
- this.jobScheduler.unschedule(this);
+ this.jobScheduler.unscheduleJob(this);
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Thu Apr 30 12:06:25 2015
@@ -146,7 +146,10 @@ public abstract class ResourceHelper {
* @return The filtered node name.
*/
public static String filterName(final String resourceName) {
- final StringBuilder sb = new StringBuilder(resourceName.length());
+ if ( resourceName == null ) {
+ return null;
+ }
+ final StringBuilder sb = new StringBuilder(resourceName.length());
char lastAdded = 0;
for(int i=0; i < resourceName.length(); i++) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java?rev=1676967&r1=1676966&r2=1676967&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfoImpl.java Thu Apr 30 12:06:25 2015
@@ -409,4 +409,13 @@ public class ScheduleInfoImpl implements
}
return null;
}
+
+ @Override
+ public String toString() {
+ return "ScheduleInfo [scheduleType=" + scheduleType
+ + ", dayOfWeek=" + dayOfWeek + ", hourOfDay=" + hourOfDay
+ + ", minuteOfHour=" + minuteOfHour + ", at=" + at
+ + ", monthOfYear=" + monthOfYear + ", expression=" + expression
+ + "]";
+ }
}