You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/27 13:23:09 UTC
[3/4] camel git commit: CAMEL-7809 : Quartz PollConsumerScheduler in
a cluster tries to create duplicate triggers, fails
CAMEL-7809 : Quartz PollConsumerScheduler in a cluster tries to create duplicate triggers, fails
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40966458
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40966458
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40966458
Branch: refs/heads/master
Commit: 409664582f532d8b9799e9525ae0e7a34918485f
Parents: 51d27c5
Author: [a556724] etienne dethoor <et...@atos.net>
Authored: Mon Mar 27 13:55:06 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 27 15:12:07 2017 +0200
----------------------------------------------------------------------
.../QuartzScheduledPollConsumerScheduler.java | 76 +++++++++++++-------
1 file changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/40966458/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 51c17dc..436683a 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -35,13 +35,15 @@ import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
+import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A quartz based {@link ScheduledPollConsumerScheduler} which uses a {@link CronTrigger} to define when the
- * poll should be triggered.
+ * A quartz based {@link ScheduledPollConsumerScheduler} which uses a
+ * {@link CronTrigger} to define when the poll should be triggered.
*/
public class QuartzScheduledPollConsumerScheduler extends ServiceSupport implements ScheduledPollConsumerScheduler, NonManagedService {
@@ -161,36 +163,50 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
setQuartzScheduler(quartz.getScheduler());
}
- JobDataMap map = new JobDataMap();
- // do not store task as its not serializable, if we have route id
- if (routeId != null) {
- map.put("routeId", routeId);
- } else {
- map.put("task", runnable);
- }
- map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
- map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
- map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
-
- job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class)
- .usingJobData(map)
- .build();
-
- // store additional information on job such as camel context etc
- QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
-
String id = triggerId;
if (id == null) {
id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
}
+ TriggerKey triggerKey = new TriggerKey(triggerId, triggerGroup);
+ Trigger existingTrigger = quartzScheduler.getTrigger(triggerKey);
+
+ // Is an trigger already exist for this triggerId ?
+ if (existingTrigger == null) {
+ JobDataMap map = new JobDataMap();
+ // do not store task as its not serializable, if we have route id
+ if (routeId != null) {
+ map.put("routeId", routeId);
+ } else {
+ map.put("task", runnable);
+ }
+ map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
+ map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+ map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+ job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class).usingJobData(map).build();
- trigger = TriggerBuilder.newTrigger()
- .withIdentity(id, triggerGroup)
- .withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone()))
- .build();
+ // store additional information on job such as camel context etc
+ QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+
+ trigger = TriggerBuilder.newTrigger().withIdentity(id, triggerGroup).withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build();
+
+ LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
+ quartzScheduler.scheduleJob(job, trigger);
+ } else {
+ checkTriggerIsNonConflicting(existingTrigger);
+
+ LOG.debug("Trigger with key {} is already present in scheduler. Only updating it.", triggerKey);
+ job = quartzScheduler.getJobDetail(existingTrigger.getJobKey());
+ JobDataMap jobData = job.getJobDataMap();
+ jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+ jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+ QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+ LOG.debug("Updated jobData map to {}", jobData);
+
+ quartzScheduler.rescheduleJob(triggerKey, existingTrigger);
+ }
- LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
- quartzScheduler.scheduleJob(job, trigger);
}
@Override
@@ -205,4 +221,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
protected void doShutdown() throws Exception {
}
+ private void checkTriggerIsNonConflicting(Trigger trigger) {
+ JobDataMap jobDataMap = trigger.getJobDataMap();
+ String routeIdFromTrigger = jobDataMap.getString("routeId");
+ if (routeIdFromTrigger != null && !routeIdFromTrigger.equals(routeId)) {
+ throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already used by route" + routeIdFromTrigger + ". Can't re-use it for route " + routeId);
+ }
+ }
+
}