You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2017/05/02 09:31:13 UTC

svn commit: r1793451 - /sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/

Author: cziegeler
Date: Tue May  2 09:31:13 2017
New Revision: 1793451

URL: http://svn.apache.org/viewvc?rev=1793451&view=rev
Log:
SLING-5387 : Provide support for running singleton jobs on non leader cluster nodes also

Modified:
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java Tue May  2 09:31:13 2017
@@ -30,6 +30,8 @@ import org.quartz.TriggerBuilder;
  */
 public class InternalScheduleOptions implements ScheduleOptions {
 
+    public String providedName;
+
     public String name;
 
     public String threadPoolName;

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java Tue May  2 09:31:13 2017
@@ -33,6 +33,7 @@ import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This component is responsible to launch a {@link org.apache.sling.commons.scheduler.Job}
@@ -56,99 +57,142 @@ public class QuartzJobExecutor implement
     /** The available Sling IDs */
     public static final AtomicReference<String[]> SLING_IDS = new AtomicReference<>(null);
 
+    public static class JobDesc {
+
+        public final Object job;
+        public final String providedName;
+        public final String name;
+        public final String[] runOn;
+
+        public JobDesc(final JobDataMap data) {
+            this.job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
+            this.name = (String) data.get(QuartzScheduler.DATA_MAP_NAME);
+            this.providedName = (String)data.get(QuartzScheduler.DATA_MAP_PROVIDED_NAME);
+            this.runOn = (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON);
+        }
+
+        public boolean isKnownJob() {
+            return this.job != null && this.name != null;
+        }
+
+        public String getKey() {
+            String key = job.getClass().getName();
+            if ( providedName != null ) {
+                key = key + "-" + providedName;
+            }
+            return key;
+        }
+
+        @Override
+        public String toString() {
+            final String runOnInfo;
+            if ( this.runOn == null ) {
+                runOnInfo = null;
+            } else if ( isRunOnLeader() ) {
+                runOnInfo = Scheduler.VALUE_RUN_ON_LEADER;
+            } else if ( isRunOnSingle() ) {
+                runOnInfo = Scheduler.VALUE_RUN_ON_SINGLE;
+            } else {
+                runOnInfo = Arrays.toString(runOn);
+            }
+            return "job '" + job + "' with name '" + name + "'" + (runOnInfo == null ? "" : " and config " + runOnInfo);
+        }
+
+        public boolean isRunOnLeader() {
+           return runOn != null && runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]);
+        }
+
+        public boolean isRunOnSingle() {
+            return runOn != null && runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0]);
+        }
+
+        public boolean shouldRunAsSingle() {
+            final String[] ids = QuartzJobExecutor.SLING_IDS.get();
+            boolean schedule = false;
+            if ( ids != null ) {
+                int index = 0;
+                try {
+                    final MessageDigest m = MessageDigest.getInstance("MD5");
+                    m.reset();
+                    m.update(getKey().getBytes("UTF-8"));
+                    index = new BigInteger(1, m.digest()).mod(BigInteger.valueOf(ids.length)).intValue();
+                } catch ( final IOException | NoSuchAlgorithmException ex ) {
+                    // although this should never happen (MD5 and UTF-8 are always available) we consider
+                    // this an error case
+                    LoggerFactory.getLogger(getClass().getName()).error("Unable to distribute scheduled " + this, ex);
+                    return false;
+                }
+                final String myId = SLING_ID;
+                schedule = myId != null && myId.equals(ids[index]);
+            }
+            return schedule;
+        }
+    }
+
     private boolean checkDiscoveryAvailable(final Logger logger,
-            final Object job,
-            final String name,
-            final String[] runOn) {
+            final JobDesc desc) {
         if ( DISCOVERY_AVAILABLE.get() ) {
             if ( DISCOVERY_INFO_AVAILABLE.get() ) {
                 return true;
             } else {
-                logger.debug("No discovery info available. Excluding job {} with name {} and config {}.",
-                        new Object[] {job, name, runOn[0]});
+                logger.debug("No discovery info available. Excluding {}.", desc);
                 return false;
             }
         } else {
-            logger.debug("No discovery available, therefore not executing job {} with name {} and config {}.",
-                    new Object[] {job, name, runOn[0]});
+            logger.debug("No discovery available, therefore not executing {}.", desc);
             return false;
         }
     }
 
     private String checkSlingId(final Logger logger,
-            final Object job,
-            final String name,
-            final String[] runOn) {
+            final JobDesc desc) {
         final String myId = SLING_ID;
         if ( myId == null ) {
-            logger.error("No Sling ID available, therefore not executing job {} with name {} and config {}.",
-                    new Object[] {job, name, Arrays.toString(runOn)});
+            logger.error("No Sling ID available, therefore not executing {}.", desc);
             return null;
         }
         return myId;
     }
 
     private boolean shouldRun(final Logger logger,
-            final Object job,
-            final String name,
-            final String[] runOn) {
-        if ( runOn != null ) {
-            if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) ) {
+            final JobDesc desc) {
+        if ( desc.runOn != null ) {
+            if ( desc.isRunOnLeader() ) {
                 // leader
-                if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+                if ( !checkDiscoveryAvailable(logger, desc) ) {
                     return false;
                 }
                 if ( !IS_LEADER.get() ) {
-                    logger.debug("Excluding job {} with name {} and config {} - instance is not leader",
-                            new Object[] {job, name, runOn[0]});
+                    logger.debug("Excluding {} - instance is not leader", desc);
                     return false;
                 }
-            } else if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0]) ) {
+            } else if ( desc.isRunOnSingle() ) {
                 // single instance
-                if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+                if ( !checkDiscoveryAvailable(logger, desc) ) {
                     return false;
                 }
-                final String myId = checkSlingId(logger, job, name, runOn);
+                final String myId = checkSlingId(logger, desc);
                 if ( myId == null ) {
                     return false;
                 }
-                final String[] ids = QuartzJobExecutor.SLING_IDS.get();
-                boolean schedule = false;
-                if ( ids != null ) {
-                    int index = 0;
-                    try {
-                        final MessageDigest m = MessageDigest.getInstance("MD5");
-                        m.reset();
-                        m.update(job.getClass().getName().getBytes("UTF-8"));
-                        index = new BigInteger(1, m.digest()).mod(BigInteger.valueOf(ids.length)).intValue();
-                    } catch ( final IOException | NoSuchAlgorithmException ex ) {
-                        // although this should never happen (MD5 and UTF-8 are always available) we consider
-                        // this an error case
-                        logger.error("Unable to distribute scheduled job " + job + " with name " + name, ex);
-                        return false;
-                    }
-                    schedule = myId.equals(ids[index]);
-                }
-                if ( !schedule ) {
-                    logger.debug("Excluding job {} with name {} and config {} - distributed to different Sling instance",
-                            new Object[] {job, name, runOn});
+                if ( !desc.shouldRunAsSingle() ) {
+                    logger.debug("Excluding {} - distributed to different Sling instance", desc);
                     return false;
                 }
             } else { // sling IDs
-                final String myId = checkSlingId(logger, job, name, runOn);
+                final String myId = checkSlingId(logger, desc);
                 if ( myId == null ) {
                     return false;
                 } else {
                     boolean schedule = false;
-                    for(final String id : runOn ) {
+                    for(final String id : desc.runOn ) {
                         if ( myId.equals(id) ) {
                             schedule = true;
                             break;
                         }
                     }
                     if ( !schedule ) {
-                        logger.debug("Excluding job {} with name {} and config {} - different Sling ID",
-                                new Object[] {job, name, Arrays.toString(runOn)});
+                        logger.debug("Excluding job {} - different Sling ID", desc);
                         return false;
                     }
                 }
@@ -164,30 +208,29 @@ public class QuartzJobExecutor implement
     public void execute(final JobExecutionContext context) throws JobExecutionException {
 
         final JobDataMap data = context.getJobDetail().getJobDataMap();
-        final Object job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
+        final JobDesc desc = new JobDesc(data);
         final Logger logger = (Logger)data.get(QuartzScheduler.DATA_MAP_LOGGER);
 
         // check run on information
-        final String name = (String) data.get(QuartzScheduler.DATA_MAP_NAME);
-        if ( !shouldRun(logger, job, name, (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON)) ) {
+        if ( !shouldRun(logger, desc) ) {
             return;
         }
 
         String origThreadName = Thread.currentThread().getName();
         try {
-            Thread.currentThread().setName(origThreadName + "-" + name);
+            Thread.currentThread().setName(origThreadName + "-" + desc.name);
 
-            logger.debug("Executing job {} with name {}", job, data.get(QuartzScheduler.DATA_MAP_NAME));
-            if (job instanceof org.apache.sling.commons.scheduler.Job) {
+            logger.debug("Executing job {}", desc);
+            if (desc.job instanceof org.apache.sling.commons.scheduler.Job) {
                 @SuppressWarnings("unchecked")
                 final Map<String, Serializable> configuration = (Map<String, Serializable>) data.get(QuartzScheduler.DATA_MAP_CONFIGURATION);
 
-                final JobContext jobCtx = new JobContextImpl(name, configuration);
-                ((org.apache.sling.commons.scheduler.Job) job).execute(jobCtx);
-            } else if (job instanceof Runnable) {
-                ((Runnable) job).run();
+                final JobContext jobCtx = new JobContextImpl(desc.name, configuration);
+                ((org.apache.sling.commons.scheduler.Job) desc.job).execute(jobCtx);
+            } else if (desc.job instanceof Runnable) {
+                ((Runnable) desc.job).run();
             } else {
-                logger.error("Scheduled job {} is neither a job nor a runnable.", job);
+                logger.error("Scheduled job {} is neither a job nor a runnable: {}", desc);
             }
         } catch (final Throwable t) {
             // if this is a quartz exception, rethrow it
@@ -195,7 +238,7 @@ public class QuartzJobExecutor implement
                 throw (JobExecutionException) t;
             }
             // there is nothing we can do here, so we just log
-            logger.error("Exception during job execution of " + job + " : " + t.getMessage(), t);
+            logger.error("Exception during job execution of " + desc + " : " + t.getMessage(), t);
         } finally {
             Thread.currentThread().setName(origThreadName);
         }

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Tue May  2 09:31:13 2017
@@ -73,6 +73,9 @@ public class QuartzScheduler implements
     /** Map key for the job object */
     static final String DATA_MAP_OBJECT = "QuartzJobScheduler.Object";
 
+    /** Map key for the provided job name */
+    static final String DATA_MAP_PROVIDED_NAME = "QuartzJobScheduler.JobName";
+
     /** Map key for the job name */
     static final String DATA_MAP_NAME = "QuartzJobScheduler.JobName";
 
@@ -244,6 +247,9 @@ public class QuartzScheduler implements
 
         jobDataMap.put(DATA_MAP_OBJECT, job);
 
+        if ( options.providedName != null ) {
+            jobDataMap.put(DATA_MAP_PROVIDED_NAME, options.providedName);
+        }
         jobDataMap.put(DATA_MAP_NAME, jobName);
         jobDataMap.put(DATA_MAP_LOGGER, this.logger);
         if ( bundleId != null ) {
@@ -586,6 +592,7 @@ public class QuartzScheduler implements
         }
 
         synchronized ( proxy ) {
+            opts.providedName = opts.name;
             final String name;
             if ( opts.name != null ) {
                 // if there is already a job with the name, remove it first

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java Tue May  2 09:31:13 2017
@@ -23,6 +23,7 @@ import java.util.NoSuchElementException;
 
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.osgi.framework.Constants;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -37,7 +38,10 @@ import org.osgi.service.component.annota
  */
 @Component(
     service = Scheduler.class,
-    scope = ServiceScope.BUNDLE
+    scope = ServiceScope.BUNDLE,
+    property = {
+            Constants.SERVICE_VENDOR + "=The Apache Software Foundation"
+    }
 )
 public class SchedulerServiceFactory implements Scheduler {
 

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java Tue May  2 09:31:13 2017
@@ -81,32 +81,35 @@ public class WebConsolePrinter {
                         final Set<JobKey> keys = s.getJobKeys(GroupMatcher.jobGroupEquals(group));
                         for(final JobKey key : keys) {
                             final JobDetail detail = s.getJobDetail(key);
-                            final String jobName = (String) detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_NAME);
-                            final Object job = detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_OBJECT);
+                            final QuartzJobExecutor.JobDesc desc = new QuartzJobExecutor.JobDesc(detail.getJobDataMap());
                             // only print jobs started through the sling scheduler
-                            if ( jobName != null && job != null ) {
+                            if ( desc.isKnownJob() ) {
                                 pw.print("Job : ");
-                                pw.print(detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_NAME));
+                                pw.print(desc.name);
                                 if ( detail.getDescription() != null && detail.getDescription().length() > 0 ) {
                                     pw.print(" (");
                                     pw.print(detail.getDescription());
                                     pw.print(")");
                                 }
                                 pw.print(", class: ");
-                                pw.print(job.getClass().getName());
+                                pw.print(desc.job.getClass().getName());
                                 pw.print(", concurrent: ");
                                 pw.print(!detail.isConcurrentExectionDisallowed());
-                                final String[] runOn = (String[])detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_RUN_ON);
-                                if ( runOn != null ) {
+                                if ( desc.runOn != null ) {
                                     pw.print(", runOn: ");
-                                    pw.print(Arrays.toString(runOn));
+                                    pw.print(Arrays.toString(desc.runOn));
                                     // check run on information
-                                    if ( runOn.length == 1 &&
-                                         (org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) || org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0])) ) {
+                                    if ( desc.isRunOnLeader() || desc.isRunOnSingle() ) {
                                         if ( QuartzJobExecutor.DISCOVERY_AVAILABLE.get() ) {
                                             if ( QuartzJobExecutor.DISCOVERY_INFO_AVAILABLE.get() ) {
-                                                if ( !QuartzJobExecutor.IS_LEADER.get() ) {
-                                                    pw.print(" (inactive: not leader)");
+                                                if ( desc.isRunOnLeader() ) {
+                                                    if ( !QuartzJobExecutor.IS_LEADER.get() ) {
+                                                        pw.print(" (inactive: not leader)");
+                                                    }
+                                                } else {
+                                                    if ( !desc.shouldRunAsSingle() ) {
+                                                        pw.print(" (inactive: single distributed elsewhere)");
+                                                    }
                                                 }
                                             } else {
                                                 pw.print(" (inactive: no discovery info)");
@@ -120,7 +123,7 @@ public class WebConsolePrinter {
                                             pw.print(" (inactive: no Sling settings)");
                                         } else {
                                             boolean schedule = false;
-                                            for(final String id : runOn ) {
+                                            for(final String id : desc.runOn ) {
                                                 if ( myId.equals(id) ) {
                                                     schedule = true;
                                                     break;

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java Tue May  2 09:31:13 2017
@@ -246,6 +246,7 @@ public class WhiteboardHandler {
     }
 
     private void scheduleJob(final ServiceReference<?> ref, final Object job, final ScheduleOptions scheduleOptions) {
+        ((InternalScheduleOptions)scheduleOptions).providedName = getStringProperty(ref, Scheduler.PROPERTY_SCHEDULER_NAME);
         final String name = getServiceIdentifier(ref);
         final Boolean concurrent = getBooleanProperty(ref, Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
         final String[] runOnOpts = getRunOpts(ref);