You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/07/31 11:18:27 UTC

svn commit: r1508781 - in /sling/trunk/bundles/commons/scheduler: ./ src/main/java/org/apache/sling/commons/scheduler/ src/main/java/org/apache/sling/commons/scheduler/impl/

Author: cziegeler
Date: Wed Jul 31 09:18:26 2013
New Revision: 1508781

URL: http://svn.apache.org/r1508781
Log:
SLING-2979 : Add support for running scheduled task only on the leader

Modified:
    sling/trunk/bundles/commons/scheduler/pom.xml
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java

Modified: sling/trunk/bundles/commons/scheduler/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/pom.xml?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/pom.xml (original)
+++ sling/trunk/bundles/commons/scheduler/pom.xml Wed Jul 31 09:18:26 2013
@@ -58,7 +58,6 @@
                             org.apache.sling.commons.scheduler.impl
                         </Private-Package>
                         <DynamicImport-Package>
-                            org.apache.sling.discovery;version="[1.0,2)",
                             commonj.work,
                             com.mchange.v2.c3p0,
                             javax.ejb,

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java Wed Jul 31 09:18:26 2013
@@ -52,8 +52,29 @@ public interface ScheduleOptions {
      * Flag indicating whether the job should only be run on the leader.
      * This defaults to false.
      * If no topology information is available (= no Apache Sling Discovery Implementation active)
-     * this flag is ignored and the job is run on all instances.
+     * this flag is ignored and the job is run on all instances!
+     * If {@link #onSingleInstanceOnly(boolean)} or {@link #onInstancesOnly(String[])} has been called before,
+     * that option is reset and overwritten by the value of this method.
      * @param flag Whether this job should only be run on the leader
      */
     ScheduleOptions onLeaderOnly(final boolean flag);
+
+    /**
+     * Flag indicating whether the job should only be run on a single instance in a cluster
+     * This defaults to false.
+     * If no topology information is available (= no Apache Sling Discovery Implementation active)
+     * this flag is ignored and the job is run on all instances!
+     * If {@link #onLeaderOnly(boolean)} or {@link #onInstancesOnly(String[])} has been called before,
+     * that option is reset and overwritten by the value of this method.
+     * @param flag Whether this job should only be run on a single instance.
+     */
+    ScheduleOptions onSingleInstanceOnly(final boolean flag);
+
+    /**
+     * List of Sling IDs this job should be run on.
+     * If {@link #onLeaderOnly(boolean)} or {@link #onSingleInstanceOnly(boolean)} has been called before,
+     * that option is reset and overwritten by the value of this method.
+     * @param flag Whether this job should only be run on a single instance.
+     */
+    ScheduleOptions onInstancesOnly(String[] slingIds);
 }

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java Wed Jul 31 09:18:26 2013
@@ -61,13 +61,31 @@ public interface Scheduler {
     /** Name of the configuration property to define the job name. */
     String PROPERTY_SCHEDULER_NAME = "scheduler.name";
 
-    /** Name of the configuration property to define if the job should only be run on the leader.
-     * Default is to start the job on all instances. This property needs to be of type Boolean.
+    /**
+     * Name of the configuration property to define the instances this job should run on.
+     * By default a job is run on all instances. This property can be configured with:
+     * - a list of Sling IDs : in that case the job is only run on instances in this set.
+     * - constant {@link #VALUE_RUN_ON_LEADER} : the job is only run on the leader
+     * - constant {@link #VALUE_RUN_ON_SINGLE} : the job is only run on a single instance in a cluster. This is
+     *                     basically the same as {@link #VALUE_RUN_ON_LEADER} but it's not further specified which
+     *                     single instance is used.
+     * Default is to start the job on all instances. This property needs to be of type String
+     * or String[].
      * If no topology information is available (= no Apache Sling Discovery Implementation active)
-     * this flag is ignored and the job is run on all instances.
+     * the values {@link #VALUE_RUN_ON_LEADER} and {@link #VALUE_RUN_ON_SINGLE} are ignored, and the job is run on all instances.
+     * @since 2.3.0
+     */
+    String PROPERTY_SCHEDULER_RUN_ON = "scheduler.runOn";
+
+    /** Value for {@link #PROPERTY_SCHEDULER_RUN_ON} to run the job on the leader only.
+     * @since 2.3.0
+     */
+    String VALUE_RUN_ON_LEADER = "LEADER";
+
+    /** Value for {@link #PROPERTY_SCHEDULER_RUN_ON} to run the job on a single instance only.
      * @since 2.3.0
      */
-    String PROPERTY_SCHEDULER_LEADER_ONLY = "scheduler.leaderonly";
+    String VALUE_RUN_ON_SINGLE = "SINGLE";
 
     /**
      * Schedule a job based on the options.

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java Wed Jul 31 09:18:26 2013
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.Map;
 
 import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.quartz.Trigger;
 import org.quartz.TriggerBuilder;
 
@@ -33,14 +34,14 @@ public class InternalScheduleOptions imp
 
     public boolean canRunConcurrently = false;
 
-    public boolean onLeaderOnly = false;
-
     public Map<String, Serializable> configuration;
 
     public final TriggerBuilder<? extends Trigger> trigger;
 
     public final IllegalArgumentException argumentException;
 
+    public String[] runOn;
+
     public InternalScheduleOptions(final TriggerBuilder<? extends Trigger> trigger) {
         this.trigger = trigger;
         this.argumentException = null;
@@ -79,7 +80,31 @@ public class InternalScheduleOptions imp
      * @see org.apache.sling.commons.scheduler.ScheduleOptions#onLeaderOnly(boolean)
      */
     public ScheduleOptions onLeaderOnly(final boolean flag) {
-        this.onLeaderOnly = flag;
+        if ( flag ) {
+            this.runOn = new String[] {Scheduler.VALUE_RUN_ON_LEADER};
+        } else {
+            this.runOn = null;
+        }
+        return this;
+    }
+
+    /**
+     * @see org.apache.sling.commons.scheduler.ScheduleOptions#onSingleInstanceOnly(boolean)
+     */
+    public ScheduleOptions onSingleInstanceOnly(final boolean flag) {
+        if ( flag ) {
+            this.runOn = new String[] {Scheduler.VALUE_RUN_ON_SINGLE};
+        } else {
+            this.runOn = null;
+        }
+        return this;
+    }
+
+    /**
+     * @see org.apache.sling.commons.scheduler.ScheduleOptions#onInstancesOnly(java.lang.String[])
+     */
+    public ScheduleOptions onInstancesOnly(final String[] slingIds) {
+        this.runOn = slingIds;
         return this;
     }
 }

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java Wed Jul 31 09:18:26 2013
@@ -44,9 +44,9 @@ public class QuartzJobExecutor implement
 
         final JobDataMap data = context.getJobDetail().getJobDataMap();
 
-        // check leader
-        final boolean onLeaderOnly = data.getBooleanValue(QuartzScheduler.DATA_MAP_ON_LEADER_ONLY);
-        if (onLeaderOnly && !IS_LEADER.get()) {
+        // check leader/single
+        final String[] runOn = (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON);
+        if (runOn != null && !IS_LEADER.get()) {
             return;
         }
 

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Wed Jul 31 09:18:26 2013
@@ -37,6 +37,7 @@ import org.apache.sling.commons.schedule
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.discovery.DiscoveryService;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
@@ -87,8 +88,8 @@ public class QuartzScheduler implements 
     /** Map key for the logger. */
     static final String DATA_MAP_LOGGER = "QuartzJobScheduler.Logger";
 
-    /** Map key for the isLeader information (Boolean). */
-    static final String DATA_MAP_ON_LEADER_ONLY = "QuartzJobScheduler.OnLeaderOnly";
+    /** Map key for the runOn information (String[]). */
+    static final String DATA_MAP_RUN_ON = "QuartzJobScheduler.runOn";
 
     /** The quartz scheduler. */
     private volatile org.quartz.Scheduler scheduler;
@@ -110,6 +111,9 @@ public class QuartzScheduler implements 
     @Property
     private static final String PROPERTY_POOL_NAME = "poolName";
 
+    @Reference
+    private DiscoveryService discoveryService;
+
     /**
      * Activate this component.
      * Start the scheduler.
@@ -247,7 +251,9 @@ public class QuartzScheduler implements 
         if ( options.configuration != null ) {
             jobDataMap.put(DATA_MAP_CONFIGURATION, options.configuration);
         }
-        jobDataMap.put(DATA_MAP_ON_LEADER_ONLY, options.onLeaderOnly);
+        if ( options.runOn != null) {
+            jobDataMap.put(DATA_MAP_RUN_ON, options.runOn);
+        }
 
         return jobDataMap;
     }
@@ -426,13 +432,21 @@ public class QuartzScheduler implements 
                 try {
                     final String name = getServiceIdentifier(ref);
                     final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
-                    final Boolean onLeaderOnly = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_LEADER_ONLY);
+                    final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON);
+                    String[] runOnOpts = null;
+                    if ( runOn instanceof String ) {
+                        runOnOpts = new String[] {runOn.toString()};
+                    } else if ( runOn instanceof String[] ) {
+                        runOnOpts = (String[])runOn;
+                    } else {
+                        this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON, ref);
+                    }
                     final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION);
                     if ( expression != null ) {
                         this.scheduleJob(job, this.EXPR(expression)
                                 .name(name)
                                 .canRunConcurrently((concurrent != null ? concurrent : true))
-                                .onLeaderOnly(onLeaderOnly != null ? onLeaderOnly : false));
+                                .onInstancesOnly(runOnOpts));
                     } else {
                         final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD);
                         if ( period != null ) {
@@ -446,7 +460,7 @@ public class QuartzScheduler implements 
                                 this.scheduleJob(job, this.PERIODIC(period, immediate)
                                         .name(name)
                                         .canRunConcurrently((concurrent != null ? concurrent : true))
-                                        .onLeaderOnly(onLeaderOnly != null ? onLeaderOnly : false));
+                                        .onInstancesOnly(runOnOpts));
                             }
                         } else {
                             this.logger.debug("Ignoring servce {} : no scheduling property found.", ref);
@@ -792,6 +806,30 @@ public class QuartzScheduler implements 
         } else {
             name = job.getClass().getName() + ':' + UUID.randomUUID();
         }
+
+        // check run on
+        if ( opts.runOn != null ) {
+            boolean schedule = false;
+            if ( opts.runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(opts.runOn[0])) {
+                schedule = true;
+            } else if ( opts.runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(opts.runOn[0])) {
+                schedule = true;
+            } else { // sling IDs
+                final String myId = this.discoveryService.getTopology().getLocalInstance().getSlingId();
+                for(final String id : opts.runOn ) {
+                    if ( myId.equals(id) ) {
+                        schedule = true;
+                        break;
+                    }
+                }
+                opts.runOn = null;
+            }
+            if ( !schedule ) {
+                this.logger.warn("Not scheduling job {} with name {} - not in required Sling ID set", job, name);
+                return;
+            }
+        }
+
         final Trigger trigger = opts.trigger.withIdentity(name).build();
 
         // create the data map