You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/08/01 19:00:15 UTC
svn commit: r1509329 - in /sling/trunk/bundles/commons/scheduler: pom.xml
src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
Author: cziegeler
Date: Thu Aug 1 17:00:15 2013
New Revision: 1509329
URL: http://svn.apache.org/r1509329
Log:
SLING-2990 : race condition in scheduler could cause tasks to be lost
Added:
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java (with props)
Modified:
sling/trunk/bundles/commons/scheduler/pom.xml
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
Modified: sling/trunk/bundles/commons/scheduler/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/pom.xml?rev=1509329&r1=1509328&r2=1509329&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/pom.xml (original)
+++ sling/trunk/bundles/commons/scheduler/pom.xml Thu Aug 1 17:00:15 2013
@@ -135,6 +135,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.settings</artifactId>
+ <version>1.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.0</version>
Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1509329&r1=1509328&r2=1509329&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Thu Aug 1 17:00:15 2013
@@ -17,31 +17,26 @@
package org.apache.sling.commons.scheduler.impl;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
-import org.apache.felix.scr.annotations.References;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.scheduler.Job;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.apache.sling.discovery.DiscoveryService;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceReference;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.component.ComponentContext;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
@@ -61,12 +56,8 @@ import org.slf4j.LoggerFactory;
* The quartz based implementation of the scheduler.
*
*/
-@Component(immediate=true, metatype=true,label="%scheduler.name",description="%scheduler.description")
+@Component(metatype=true,label="%scheduler.name",description="%scheduler.description")
@Service(value=Scheduler.class)
-@References({
- @Reference(name="job", referenceInterface=Job.class, cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, policy=ReferencePolicy.DYNAMIC),
- @Reference(name="task", referenceInterface=Runnable.class, cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, policy=ReferencePolicy.DYNAMIC)
-})
public class QuartzScheduler implements Scheduler {
/** Default logger. */
@@ -94,12 +85,6 @@ public class QuartzScheduler implements
/** The quartz scheduler. */
private volatile org.quartz.Scheduler scheduler;
- /** List of registrations while this service is not activated yet. */
- private final List<Registration> registeredJobs = new ArrayList<Registration>();
-
- /** The component context. */
- private volatile ComponentContext context;
-
@Reference
private ThreadPoolManager threadPoolManager;
@@ -112,55 +97,39 @@ public class QuartzScheduler implements
private static final String PROPERTY_POOL_NAME = "poolName";
@Reference
- private DiscoveryService discoveryService;
+ private SlingSettingsService settings;
/**
* Activate this component.
* Start the scheduler.
- * @param ctx The component context.
* @throws Exception
*/
- protected void activate(final ComponentContext ctx) throws Exception {
- final Object poolNameObj = ctx.getProperties().get(PROPERTY_POOL_NAME);
+ @Activate
+ protected void activate(final BundleContext ctx, final Map<String, Object> props) throws Exception {
+ final Object poolNameObj = props.get(PROPERTY_POOL_NAME);
final String poolName;
if ( poolNameObj != null && poolNameObj.toString().trim().length() > 0 ) {
poolName = poolNameObj.toString().trim();
} else {
poolName = null;
}
- this.context = ctx;
+
// start scheduler
this.scheduler = this.init(poolName);
-
- final Registration[] regs;
- synchronized ( this.registeredJobs ) {
- regs = this.registeredJobs.toArray(new Registration[this.registeredJobs.size()]);
- this.registeredJobs.clear();
- }
- for( final Registration reg : regs ) {
- try {
- this.register(reg.componentName, reg.reference);
- } catch (Exception e) {
- // we don't want that one malicious service brings down the scheduler, so we just log
- // the exception and continue
- this.logger.error("Exception during registering " + reg.componentName + " service " + reg.reference, e);
- }
- }
- this.plugin = WebConsolePrinter.initPlugin(ctx.getBundleContext(), this);
+ this.plugin = WebConsolePrinter.initPlugin(ctx, this);
}
/**
* Deactivate this component.
* Stop the scheduler.
- * @param ctx The component context.
*/
- protected void deactivate(final ComponentContext ctx) {
+ @Deactivate
+ protected void deactivate() {
WebConsolePrinter.destroyPlugin(this.plugin);
this.plugin = null;
final org.quartz.Scheduler s = this.scheduler;
this.scheduler = null;
this.dispose(s);
- this.context = null;
}
/**
@@ -398,193 +367,11 @@ public class QuartzScheduler implements
}
}
- /**
- * Create unique identifier
- * @param type
- * @param ref
- * @throws Exception
- */
- private String getServiceIdentifier(final ServiceReference ref) {
- String name = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_NAME);
- if ( name == null ) {
- name = (String)ref.getProperty(Constants.SERVICE_PID);
- if ( name == null ) {
- name = "Registered Service";
- }
- }
- // now append service id to create a unique identifier
- name = name + "." + ref.getProperty(Constants.SERVICE_ID);
- return name;
- }
-
- /**
- * Register a job or task
- * @param type The type (job or task)
- * @param ref The service reference
- */
- private void register(final String type, final ServiceReference ref) {
- // we called from bind, it might be that deactivate has been
- // called in the meantime
- final ComponentContext ctx = this.context;
- if ( ctx != null ) {
- final Object job = ctx.locateService(type, ref);
- if ( job != null ) {
- try {
- final String name = getServiceIdentifier(ref);
- final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
- final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON);
- String[] runOnOpts = null;
- if ( runOn instanceof String ) {
- runOnOpts = new String[] {runOn.toString()};
- } else if ( runOn instanceof String[] ) {
- runOnOpts = (String[])runOn;
- } else {
- this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON, ref);
- }
- final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION);
- if ( expression != null ) {
- this.scheduleJob(job, this.EXPR(expression)
- .name(name)
- .canRunConcurrently((concurrent != null ? concurrent : true))
- .onInstancesOnly(runOnOpts));
- } else {
- final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD);
- if ( period != null ) {
- if ( period < 1 ) {
- this.logger.debug("Ignoring service {} : scheduler period is less than 1.", ref);
- } else {
- boolean immediate = false;
- if ( ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE) != null ) {
- immediate = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE);
- }
- this.scheduleJob(job, this.PERIODIC(period, immediate)
- .name(name)
- .canRunConcurrently((concurrent != null ? concurrent : true))
- .onInstancesOnly(runOnOpts));
- }
- } else {
- this.logger.debug("Ignoring servce {} : no scheduling property found.", ref);
- }
- }
- } catch (final IllegalStateException e) {
- // this can happen if deactivate has been called or the scheduling expression is invalid
- this.logger.warn("Ignoring servce " + ref + " : exception occurred during registering.", e);
- } catch (final SchedulerException e) {
- // this can happen if deactivate has been called or the scheduling expression is invalid
- this.logger.warn("Ignoring servce " + ref + " : exception occurred during registering.", e);
- }
- }
- }
- }
-
- /**
- * Unregister a service.
- * @param ref The service reference.
- */
- private void unregister(final ServiceReference ref) {
- try {
- final String name = getServiceIdentifier(ref);
- this.removeJob(name);
- } catch (NoSuchElementException nsee) {
- // we ignore this
- }
- }
-
- /**
- * Bind a new job.
- * @param ref
- * @throws Exception
- */
- protected void bindJob(final ServiceReference ref) {
- if ( this.scheduler != null ) {
- this.register(Registration.JOB, ref);
- } else {
- synchronized ( this.registeredJobs ) {
- this.registeredJobs.add(new Registration(ref, Registration.JOB));
- }
- }
- }
-
- /**
- * Unbind a job.
- * @param ref
- */
- protected void unbindJob(final ServiceReference ref) {
- if ( this.scheduler != null ) {
- this.unregister(ref);
- } else {
- synchronized ( this.registeredJobs ) {
- this.registeredJobs.remove(new Registration(ref, Registration.JOB));
- }
- }
- }
-
- /**
- * Bind a new task.
- * @param ref
- * @throws Exception
- */
- protected void bindTask(final ServiceReference ref) {
- if ( this.scheduler != null ) {
- this.register(Registration.TASK, ref);
- } else {
- synchronized ( this.registeredJobs ) {
- this.registeredJobs.add(new Registration(ref, Registration.TASK));
- }
- }
- }
-
- /**
- * Unbind a task.
- * @param ref
- */
- protected void unbindTask(final ServiceReference ref) {
- if ( this.scheduler != null ) {
- this.unregister(ref);
- } else {
- synchronized ( this.registeredJobs ) {
- this.registeredJobs.remove(new Registration(ref, Registration.TASK));
- }
- }
- }
-
+ /** Used by the web console plugin. */
org.quartz.Scheduler getScheduler() {
return this.scheduler;
}
- /**
- * Helper class holding a registration if this service is not active yet.
- */
- private static final class Registration {
- public static final String JOB = "job";
- public static final String TASK = "task";
-
- public final ServiceReference reference;
- public final String componentName;
-
- public Registration(final ServiceReference r, final String name) {
- this.reference = r;
- this.componentName = name;
- }
-
- @Override
- public boolean equals(Object obj) {
- if ( !(obj instanceof Registration) ) {
- return false;
- }
- if ( obj == this ) {
- return true;
- }
- return this.reference.equals(((Registration)obj).reference);
- }
-
- @Override
- public int hashCode() {
- return this.reference.hashCode();
- }
- }
-
-
public static final class QuartzThreadPool implements org.quartz.spi.ThreadPool {
/** Our executor thread pool */
@@ -815,7 +602,7 @@ public class QuartzScheduler implements
} else if ( opts.runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(opts.runOn[0])) {
schedule = true;
} else { // sling IDs
- final String myId = this.discoveryService.getTopology().getLocalInstance().getSlingId();
+ final String myId = this.settings.getSlingId();
for(final String id : opts.runOn ) {
if ( myId.equals(id) ) {
schedule = true;
Added: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java?rev=1509329&view=auto
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java (added)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java Thu Aug 1 17:00:15 2013
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.commons.scheduler.impl;
+
+import java.util.Date;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.commons.scheduler.Job;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The quartz based implementation of the scheduler.
+ *
+ */
+@Component(immediate=true)
+public class WhiteboardHandler {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ private Scheduler scheduler;
+
+ private ServiceTracker serviceTracker;
+
+ /**
+ * Activate this component.
+ * @throws InvalidSyntaxException
+ */
+ @Activate
+ protected void activate(final BundleContext btx) throws InvalidSyntaxException {
+ this.serviceTracker = new ServiceTracker(btx,
+ btx.createFilter("(|(" + Constants.OBJECTCLASS + "=" + Runnable.class.getName() + ")" +
+ "(" + Constants.OBJECTCLASS + "=" + Job.class.getName() + "))"),
+ new ServiceTrackerCustomizer() {
+
+ public synchronized void removedService(final ServiceReference reference, final Object service) {
+ btx.ungetService(reference);
+ unregister(reference, service);
+ }
+
+ public synchronized void modifiedService(final ServiceReference reference, final Object service) {
+ unregister(reference, service);
+ register(reference, service);
+ }
+
+ public synchronized Object addingService(final ServiceReference reference) {
+ final Object obj = btx.getService(reference);
+ if ( obj != null ) {
+ register(reference, obj);
+ }
+ return obj;
+ }
+ });
+ this.serviceTracker.open();
+ }
+
+ /**
+ * Deactivate this component.
+ * Stop the scheduler.
+ * @param ctx The component context.
+ */
+ @Deactivate
+ protected void deactivate() {
+ if ( this.serviceTracker != null ) {
+ this.serviceTracker.close();
+ this.serviceTracker = null;
+ }
+ }
+
+
+ /**
+ * Create unique identifier
+ * @param type
+ * @param ref
+ * @throws Exception
+ */
+ private String getServiceIdentifier(final ServiceReference ref) {
+ String name = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_NAME);
+ if ( name == null ) {
+ name = (String)ref.getProperty(Constants.SERVICE_PID);
+ if ( name == null ) {
+ name = "Registered Service";
+ }
+ }
+ // now append service id to create a unique identifier
+ name = name + "." + ref.getProperty(Constants.SERVICE_ID);
+ return name;
+ }
+
+ /**
+ * Register a job or task
+ * @param type The type (job or task)
+ * @param ref The service reference
+ */
+ private void register(final ServiceReference ref, final Object job) {
+ final String name = getServiceIdentifier(ref);
+ final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
+ final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON);
+ String[] runOnOpts = null;
+ if ( runOn instanceof String ) {
+ runOnOpts = new String[] {runOn.toString()};
+ } else if ( runOn instanceof String[] ) {
+ runOnOpts = (String[])runOn;
+ } else {
+ this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON, ref);
+ }
+ final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION);
+ if ( expression != null ) {
+ this.scheduler.schedule(job, this.scheduler.EXPR(expression)
+ .name(name)
+ .canRunConcurrently((concurrent != null ? concurrent : true))
+ .onInstancesOnly(runOnOpts));
+ } else {
+ final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD);
+ if ( period != null ) {
+ if ( period < 1 ) {
+ this.logger.debug("Ignoring service {} : scheduler period is less than 1.", ref);
+ } else {
+ boolean immediate = false;
+ if ( ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE) != null ) {
+ immediate = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE);
+ }
+ final Date date = new Date();
+ if ( !immediate ) {
+ date.setTime(System.currentTimeMillis() + period * 1000);
+ }
+ this.scheduler.schedule(job, this.scheduler.AT(date, -1, period)
+ .name(name)
+ .canRunConcurrently((concurrent != null ? concurrent : true))
+ .onInstancesOnly(runOnOpts));
+ }
+ } else {
+ this.logger.debug("Ignoring servce {} : no scheduling property found.", ref);
+ }
+ }
+ }
+
+ /**
+ * Unregister a service.
+ * @param ref The service reference.
+ */
+ private void unregister(final ServiceReference reference, final Object service) {
+ final String name = getServiceIdentifier(reference);
+ this.scheduler.unschedule(name);
+ }
+}
Propchange: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url