You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2017/05/02 09:31:13 UTC
svn commit: r1793451 -
/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/
Author: cziegeler
Date: Tue May 2 09:31:13 2017
New Revision: 1793451
URL: http://svn.apache.org/viewvc?rev=1793451&view=rev
Log:
SLING-5387 : Provide support for running singleton jobs on non leader cluster nodes also
Modified:
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java Tue May 2 09:31:13 2017
@@ -30,6 +30,8 @@ import org.quartz.TriggerBuilder;
*/
public class InternalScheduleOptions implements ScheduleOptions {
+ public String providedName;
+
public String name;
public String threadPoolName;
Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java Tue May 2 09:31:13 2017
@@ -33,6 +33,7 @@ import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This component is responsible to launch a {@link org.apache.sling.commons.scheduler.Job}
@@ -56,99 +57,142 @@ public class QuartzJobExecutor implement
/** The available Sling IDs */
public static final AtomicReference<String[]> SLING_IDS = new AtomicReference<>(null);
+ public static class JobDesc {
+
+ public final Object job;
+ public final String providedName;
+ public final String name;
+ public final String[] runOn;
+
+ public JobDesc(final JobDataMap data) {
+ this.job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
+ this.name = (String) data.get(QuartzScheduler.DATA_MAP_NAME);
+ this.providedName = (String)data.get(QuartzScheduler.DATA_MAP_PROVIDED_NAME);
+ this.runOn = (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON);
+ }
+
+ public boolean isKnownJob() {
+ return this.job != null && this.name != null;
+ }
+
+ public String getKey() {
+ String key = job.getClass().getName();
+ if ( providedName != null ) {
+ key = key + "-" + providedName;
+ }
+ return key;
+ }
+
+ @Override
+ public String toString() {
+ final String runOnInfo;
+ if ( this.runOn == null ) {
+ runOnInfo = null;
+ } else if ( isRunOnLeader() ) {
+ runOnInfo = Scheduler.VALUE_RUN_ON_LEADER;
+ } else if ( isRunOnSingle() ) {
+ runOnInfo = Scheduler.VALUE_RUN_ON_SINGLE;
+ } else {
+ runOnInfo = Arrays.toString(runOn);
+ }
+ return "job '" + job + "' with name '" + name + "'" + (runOnInfo == null ? "" : " and config " + runOnInfo);
+ }
+
+ public boolean isRunOnLeader() {
+ return runOn != null && runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]);
+ }
+
+ public boolean isRunOnSingle() {
+ return runOn != null && runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0]);
+ }
+
+ public boolean shouldRunAsSingle() {
+ final String[] ids = QuartzJobExecutor.SLING_IDS.get();
+ boolean schedule = false;
+ if ( ids != null ) {
+ int index = 0;
+ try {
+ final MessageDigest m = MessageDigest.getInstance("MD5");
+ m.reset();
+ m.update(getKey().getBytes("UTF-8"));
+ index = new BigInteger(1, m.digest()).mod(BigInteger.valueOf(ids.length)).intValue();
+ } catch ( final IOException | NoSuchAlgorithmException ex ) {
+ // although this should never happen (MD5 and UTF-8 are always available) we consider
+ // this an error case
+ LoggerFactory.getLogger(getClass().getName()).error("Unable to distribute scheduled " + this, ex);
+ return false;
+ }
+ final String myId = SLING_ID;
+ schedule = myId != null && myId.equals(ids[index]);
+ }
+ return schedule;
+ }
+ }
+
private boolean checkDiscoveryAvailable(final Logger logger,
- final Object job,
- final String name,
- final String[] runOn) {
+ final JobDesc desc) {
if ( DISCOVERY_AVAILABLE.get() ) {
if ( DISCOVERY_INFO_AVAILABLE.get() ) {
return true;
} else {
- logger.debug("No discovery info available. Excluding job {} with name {} and config {}.",
- new Object[] {job, name, runOn[0]});
+ logger.debug("No discovery info available. Excluding {}.", desc);
return false;
}
} else {
- logger.debug("No discovery available, therefore not executing job {} with name {} and config {}.",
- new Object[] {job, name, runOn[0]});
+ logger.debug("No discovery available, therefore not executing {}.", desc);
return false;
}
}
private String checkSlingId(final Logger logger,
- final Object job,
- final String name,
- final String[] runOn) {
+ final JobDesc desc) {
final String myId = SLING_ID;
if ( myId == null ) {
- logger.error("No Sling ID available, therefore not executing job {} with name {} and config {}.",
- new Object[] {job, name, Arrays.toString(runOn)});
+ logger.error("No Sling ID available, therefore not executing {}.", desc);
return null;
}
return myId;
}
private boolean shouldRun(final Logger logger,
- final Object job,
- final String name,
- final String[] runOn) {
- if ( runOn != null ) {
- if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) ) {
+ final JobDesc desc) {
+ if ( desc.runOn != null ) {
+ if ( desc.isRunOnLeader() ) {
// leader
- if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+ if ( !checkDiscoveryAvailable(logger, desc) ) {
return false;
}
if ( !IS_LEADER.get() ) {
- logger.debug("Excluding job {} with name {} and config {} - instance is not leader",
- new Object[] {job, name, runOn[0]});
+ logger.debug("Excluding {} - instance is not leader", desc);
return false;
}
- } else if ( runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0]) ) {
+ } else if ( desc.isRunOnSingle() ) {
// single instance
- if ( !checkDiscoveryAvailable(logger, job, name, runOn) ) {
+ if ( !checkDiscoveryAvailable(logger, desc) ) {
return false;
}
- final String myId = checkSlingId(logger, job, name, runOn);
+ final String myId = checkSlingId(logger, desc);
if ( myId == null ) {
return false;
}
- final String[] ids = QuartzJobExecutor.SLING_IDS.get();
- boolean schedule = false;
- if ( ids != null ) {
- int index = 0;
- try {
- final MessageDigest m = MessageDigest.getInstance("MD5");
- m.reset();
- m.update(job.getClass().getName().getBytes("UTF-8"));
- index = new BigInteger(1, m.digest()).mod(BigInteger.valueOf(ids.length)).intValue();
- } catch ( final IOException | NoSuchAlgorithmException ex ) {
- // although this should never happen (MD5 and UTF-8 are always available) we consider
- // this an error case
- logger.error("Unable to distribute scheduled job " + job + " with name " + name, ex);
- return false;
- }
- schedule = myId.equals(ids[index]);
- }
- if ( !schedule ) {
- logger.debug("Excluding job {} with name {} and config {} - distributed to different Sling instance",
- new Object[] {job, name, runOn});
+ if ( !desc.shouldRunAsSingle() ) {
+ logger.debug("Excluding {} - distributed to different Sling instance", desc);
return false;
}
} else { // sling IDs
- final String myId = checkSlingId(logger, job, name, runOn);
+ final String myId = checkSlingId(logger, desc);
if ( myId == null ) {
return false;
} else {
boolean schedule = false;
- for(final String id : runOn ) {
+ for(final String id : desc.runOn ) {
if ( myId.equals(id) ) {
schedule = true;
break;
}
}
if ( !schedule ) {
- logger.debug("Excluding job {} with name {} and config {} - different Sling ID",
- new Object[] {job, name, Arrays.toString(runOn)});
+ logger.debug("Excluding job {} - different Sling ID", desc);
return false;
}
}
@@ -164,30 +208,29 @@ public class QuartzJobExecutor implement
public void execute(final JobExecutionContext context) throws JobExecutionException {
final JobDataMap data = context.getJobDetail().getJobDataMap();
- final Object job = data.get(QuartzScheduler.DATA_MAP_OBJECT);
+ final JobDesc desc = new JobDesc(data);
final Logger logger = (Logger)data.get(QuartzScheduler.DATA_MAP_LOGGER);
// check run on information
- final String name = (String) data.get(QuartzScheduler.DATA_MAP_NAME);
- if ( !shouldRun(logger, job, name, (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON)) ) {
+ if ( !shouldRun(logger, desc) ) {
return;
}
String origThreadName = Thread.currentThread().getName();
try {
- Thread.currentThread().setName(origThreadName + "-" + name);
+ Thread.currentThread().setName(origThreadName + "-" + desc.name);
- logger.debug("Executing job {} with name {}", job, data.get(QuartzScheduler.DATA_MAP_NAME));
- if (job instanceof org.apache.sling.commons.scheduler.Job) {
+ logger.debug("Executing job {}", desc);
+ if (desc.job instanceof org.apache.sling.commons.scheduler.Job) {
@SuppressWarnings("unchecked")
final Map<String, Serializable> configuration = (Map<String, Serializable>) data.get(QuartzScheduler.DATA_MAP_CONFIGURATION);
- final JobContext jobCtx = new JobContextImpl(name, configuration);
- ((org.apache.sling.commons.scheduler.Job) job).execute(jobCtx);
- } else if (job instanceof Runnable) {
- ((Runnable) job).run();
+ final JobContext jobCtx = new JobContextImpl(desc.name, configuration);
+ ((org.apache.sling.commons.scheduler.Job) desc.job).execute(jobCtx);
+ } else if (desc.job instanceof Runnable) {
+ ((Runnable) desc.job).run();
} else {
- logger.error("Scheduled job {} is neither a job nor a runnable.", job);
+ logger.error("Scheduled job {} is neither a job nor a runnable: {}", desc);
}
} catch (final Throwable t) {
// if this is a quartz exception, rethrow it
@@ -195,7 +238,7 @@ public class QuartzJobExecutor implement
throw (JobExecutionException) t;
}
// there is nothing we can do here, so we just log
- logger.error("Exception during job execution of " + job + " : " + t.getMessage(), t);
+ logger.error("Exception during job execution of " + desc + " : " + t.getMessage(), t);
} finally {
Thread.currentThread().setName(origThreadName);
}
Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Tue May 2 09:31:13 2017
@@ -73,6 +73,9 @@ public class QuartzScheduler implements
/** Map key for the job object */
static final String DATA_MAP_OBJECT = "QuartzJobScheduler.Object";
+ /** Map key for the provided job name */
+ static final String DATA_MAP_PROVIDED_NAME = "QuartzJobScheduler.JobName";
+
/** Map key for the job name */
static final String DATA_MAP_NAME = "QuartzJobScheduler.JobName";
@@ -244,6 +247,9 @@ public class QuartzScheduler implements
jobDataMap.put(DATA_MAP_OBJECT, job);
+ if ( options.providedName != null ) {
+ jobDataMap.put(DATA_MAP_PROVIDED_NAME, options.providedName);
+ }
jobDataMap.put(DATA_MAP_NAME, jobName);
jobDataMap.put(DATA_MAP_LOGGER, this.logger);
if ( bundleId != null ) {
@@ -586,6 +592,7 @@ public class QuartzScheduler implements
}
synchronized ( proxy ) {
+ opts.providedName = opts.name;
final String name;
if ( opts.name != null ) {
// if there is already a job with the name, remove it first
Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/SchedulerServiceFactory.java Tue May 2 09:31:13 2017
@@ -23,6 +23,7 @@ import java.util.NoSuchElementException;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -37,7 +38,10 @@ import org.osgi.service.component.annota
*/
@Component(
service = Scheduler.class,
- scope = ServiceScope.BUNDLE
+ scope = ServiceScope.BUNDLE,
+ property = {
+ Constants.SERVICE_VENDOR + "=The Apache Software Foundation"
+ }
)
public class SchedulerServiceFactory implements Scheduler {
Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WebConsolePrinter.java Tue May 2 09:31:13 2017
@@ -81,32 +81,35 @@ public class WebConsolePrinter {
final Set<JobKey> keys = s.getJobKeys(GroupMatcher.jobGroupEquals(group));
for(final JobKey key : keys) {
final JobDetail detail = s.getJobDetail(key);
- final String jobName = (String) detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_NAME);
- final Object job = detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_OBJECT);
+ final QuartzJobExecutor.JobDesc desc = new QuartzJobExecutor.JobDesc(detail.getJobDataMap());
// only print jobs started through the sling scheduler
- if ( jobName != null && job != null ) {
+ if ( desc.isKnownJob() ) {
pw.print("Job : ");
- pw.print(detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_NAME));
+ pw.print(desc.name);
if ( detail.getDescription() != null && detail.getDescription().length() > 0 ) {
pw.print(" (");
pw.print(detail.getDescription());
pw.print(")");
}
pw.print(", class: ");
- pw.print(job.getClass().getName());
+ pw.print(desc.job.getClass().getName());
pw.print(", concurrent: ");
pw.print(!detail.isConcurrentExectionDisallowed());
- final String[] runOn = (String[])detail.getJobDataMap().get(QuartzScheduler.DATA_MAP_RUN_ON);
- if ( runOn != null ) {
+ if ( desc.runOn != null ) {
pw.print(", runOn: ");
- pw.print(Arrays.toString(runOn));
+ pw.print(Arrays.toString(desc.runOn));
// check run on information
- if ( runOn.length == 1 &&
- (org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER.equals(runOn[0]) || org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_SINGLE.equals(runOn[0])) ) {
+ if ( desc.isRunOnLeader() || desc.isRunOnSingle() ) {
if ( QuartzJobExecutor.DISCOVERY_AVAILABLE.get() ) {
if ( QuartzJobExecutor.DISCOVERY_INFO_AVAILABLE.get() ) {
- if ( !QuartzJobExecutor.IS_LEADER.get() ) {
- pw.print(" (inactive: not leader)");
+ if ( desc.isRunOnLeader() ) {
+ if ( !QuartzJobExecutor.IS_LEADER.get() ) {
+ pw.print(" (inactive: not leader)");
+ }
+ } else {
+ if ( !desc.shouldRunAsSingle() ) {
+ pw.print(" (inactive: single distributed elsewhere)");
+ }
}
} else {
pw.print(" (inactive: no discovery info)");
@@ -120,7 +123,7 @@ public class WebConsolePrinter {
pw.print(" (inactive: no Sling settings)");
} else {
boolean schedule = false;
- for(final String id : runOn ) {
+ for(final String id : desc.runOn ) {
if ( myId.equals(id) ) {
schedule = true;
break;
Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java?rev=1793451&r1=1793450&r2=1793451&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java (original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java Tue May 2 09:31:13 2017
@@ -246,6 +246,7 @@ public class WhiteboardHandler {
}
private void scheduleJob(final ServiceReference<?> ref, final Object job, final ScheduleOptions scheduleOptions) {
+ ((InternalScheduleOptions)scheduleOptions).providedName = getStringProperty(ref, Scheduler.PROPERTY_SCHEDULER_NAME);
final String name = getServiceIdentifier(ref);
final Boolean concurrent = getBooleanProperty(ref, Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
final String[] runOnOpts = getRunOpts(ref);