You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/27 13:23:07 UTC
[1/4] camel git commit: CAMEL-7809 : Quartz PollConsumerScheduler in
a cluster tries to create duplicate triggers, fails
Repository: camel
Updated Branches:
refs/heads/camel-2.18.x c67728c6d -> 2301422d0
refs/heads/master 51d27c595 -> f28c95c86
CAMEL-7809 : Quartz PollConsumerScheduler in a cluster tries to create duplicate triggers, fails
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6e875c9e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6e875c9e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6e875c9e
Branch: refs/heads/camel-2.18.x
Commit: 6e875c9e191432232899d31733841b5d37c4e04c
Parents: c67728c
Author: [a556724] etienne dethoor <et...@atos.net>
Authored: Mon Mar 27 13:55:06 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 27 14:39:56 2017 +0200
----------------------------------------------------------------------
.../QuartzScheduledPollConsumerScheduler.java | 76 +++++++++++++-------
1 file changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6e875c9e/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 51c17dc..436683a 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -35,13 +35,15 @@ import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
+import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A quartz based {@link ScheduledPollConsumerScheduler} which uses a {@link CronTrigger} to define when the
- * poll should be triggered.
+ * A quartz based {@link ScheduledPollConsumerScheduler} which uses a
+ * {@link CronTrigger} to define when the poll should be triggered.
*/
public class QuartzScheduledPollConsumerScheduler extends ServiceSupport implements ScheduledPollConsumerScheduler, NonManagedService {
@@ -161,36 +163,50 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
setQuartzScheduler(quartz.getScheduler());
}
- JobDataMap map = new JobDataMap();
- // do not store task as its not serializable, if we have route id
- if (routeId != null) {
- map.put("routeId", routeId);
- } else {
- map.put("task", runnable);
- }
- map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
- map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
- map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
-
- job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class)
- .usingJobData(map)
- .build();
-
- // store additional information on job such as camel context etc
- QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
-
String id = triggerId;
if (id == null) {
id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
}
+ TriggerKey triggerKey = new TriggerKey(triggerId, triggerGroup);
+ Trigger existingTrigger = quartzScheduler.getTrigger(triggerKey);
+
+ // Is an trigger already exist for this triggerId ?
+ if (existingTrigger == null) {
+ JobDataMap map = new JobDataMap();
+ // do not store task as its not serializable, if we have route id
+ if (routeId != null) {
+ map.put("routeId", routeId);
+ } else {
+ map.put("task", runnable);
+ }
+ map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
+ map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+ map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+ job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class).usingJobData(map).build();
- trigger = TriggerBuilder.newTrigger()
- .withIdentity(id, triggerGroup)
- .withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone()))
- .build();
+ // store additional information on job such as camel context etc
+ QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+
+ trigger = TriggerBuilder.newTrigger().withIdentity(id, triggerGroup).withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build();
+
+ LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
+ quartzScheduler.scheduleJob(job, trigger);
+ } else {
+ checkTriggerIsNonConflicting(existingTrigger);
+
+ LOG.debug("Trigger with key {} is already present in scheduler. Only updating it.", triggerKey);
+ job = quartzScheduler.getJobDetail(existingTrigger.getJobKey());
+ JobDataMap jobData = job.getJobDataMap();
+ jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+ jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+ QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+ LOG.debug("Updated jobData map to {}", jobData);
+
+ quartzScheduler.rescheduleJob(triggerKey, existingTrigger);
+ }
- LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
- quartzScheduler.scheduleJob(job, trigger);
}
@Override
@@ -205,4 +221,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
protected void doShutdown() throws Exception {
}
+ private void checkTriggerIsNonConflicting(Trigger trigger) {
+ JobDataMap jobDataMap = trigger.getJobDataMap();
+ String routeIdFromTrigger = jobDataMap.getString("routeId");
+ if (routeIdFromTrigger != null && !routeIdFromTrigger.equals(routeId)) {
+ throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already used by route" + routeIdFromTrigger + ". Can't re-use it for route " + routeId);
+ }
+ }
+
}
[3/4] camel git commit: CAMEL-7809 : Quartz PollConsumerScheduler in
a cluster tries to create duplicate triggers, fails
Posted by da...@apache.org.
CAMEL-7809 : Quartz PollConsumerScheduler in a cluster tries to create duplicate triggers, fails
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40966458
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40966458
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40966458
Branch: refs/heads/master
Commit: 409664582f532d8b9799e9525ae0e7a34918485f
Parents: 51d27c5
Author: [a556724] etienne dethoor <et...@atos.net>
Authored: Mon Mar 27 13:55:06 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 27 15:12:07 2017 +0200
----------------------------------------------------------------------
.../QuartzScheduledPollConsumerScheduler.java | 76 +++++++++++++-------
1 file changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/40966458/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 51c17dc..436683a 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -35,13 +35,15 @@ import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
+import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A quartz based {@link ScheduledPollConsumerScheduler} which uses a {@link CronTrigger} to define when the
- * poll should be triggered.
+ * A quartz based {@link ScheduledPollConsumerScheduler} which uses a
+ * {@link CronTrigger} to define when the poll should be triggered.
*/
public class QuartzScheduledPollConsumerScheduler extends ServiceSupport implements ScheduledPollConsumerScheduler, NonManagedService {
@@ -161,36 +163,50 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
setQuartzScheduler(quartz.getScheduler());
}
- JobDataMap map = new JobDataMap();
- // do not store task as its not serializable, if we have route id
- if (routeId != null) {
- map.put("routeId", routeId);
- } else {
- map.put("task", runnable);
- }
- map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
- map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
- map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
-
- job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class)
- .usingJobData(map)
- .build();
-
- // store additional information on job such as camel context etc
- QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
-
String id = triggerId;
if (id == null) {
id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
}
+ TriggerKey triggerKey = new TriggerKey(triggerId, triggerGroup);
+ Trigger existingTrigger = quartzScheduler.getTrigger(triggerKey);
+
+ // Is an trigger already exist for this triggerId ?
+ if (existingTrigger == null) {
+ JobDataMap map = new JobDataMap();
+ // do not store task as its not serializable, if we have route id
+ if (routeId != null) {
+ map.put("routeId", routeId);
+ } else {
+ map.put("task", runnable);
+ }
+ map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
+ map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+ map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+ job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class).usingJobData(map).build();
- trigger = TriggerBuilder.newTrigger()
- .withIdentity(id, triggerGroup)
- .withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone()))
- .build();
+ // store additional information on job such as camel context etc
+ QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+
+ trigger = TriggerBuilder.newTrigger().withIdentity(id, triggerGroup).withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build();
+
+ LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
+ quartzScheduler.scheduleJob(job, trigger);
+ } else {
+ checkTriggerIsNonConflicting(existingTrigger);
+
+ LOG.debug("Trigger with key {} is already present in scheduler. Only updating it.", triggerKey);
+ job = quartzScheduler.getJobDetail(existingTrigger.getJobKey());
+ JobDataMap jobData = job.getJobDataMap();
+ jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+ jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+ QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+ LOG.debug("Updated jobData map to {}", jobData);
+
+ quartzScheduler.rescheduleJob(triggerKey, existingTrigger);
+ }
- LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
- quartzScheduler.scheduleJob(job, trigger);
}
@Override
@@ -205,4 +221,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
protected void doShutdown() throws Exception {
}
+ private void checkTriggerIsNonConflicting(Trigger trigger) {
+ JobDataMap jobDataMap = trigger.getJobDataMap();
+ String routeIdFromTrigger = jobDataMap.getString("routeId");
+ if (routeIdFromTrigger != null && !routeIdFromTrigger.equals(routeId)) {
+ throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already used by route" + routeIdFromTrigger + ". Can't re-use it for route " + routeId);
+ }
+ }
+
}
[2/4] camel git commit: CAMEL-7809: Fixed test. This closes #1577.
Posted by da...@apache.org.
CAMEL-7809: Fixed test. This closes #1577.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2301422d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2301422d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2301422d
Branch: refs/heads/camel-2.18.x
Commit: 2301422d06c6bbb144e0a6913d2877892ad4428d
Parents: 6e875c9
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 27 15:11:23 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 27 15:11:23 2017 +0200
----------------------------------------------------------------------
.../quartz2/QuartzScheduledPollConsumerScheduler.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2301422d/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 436683a..5192a56 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -167,8 +167,13 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
if (id == null) {
id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
}
- TriggerKey triggerKey = new TriggerKey(triggerId, triggerGroup);
- Trigger existingTrigger = quartzScheduler.getTrigger(triggerKey);
+
+ Trigger existingTrigger = null;
+ TriggerKey triggerKey = null;
+ if (triggerId != null && triggerGroup != null) {
+ triggerKey = new TriggerKey(triggerId, triggerGroup);
+ existingTrigger = quartzScheduler.getTrigger(triggerKey);
+ }
// Is an trigger already exist for this triggerId ?
if (existingTrigger == null) {
@@ -206,7 +211,6 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
quartzScheduler.rescheduleJob(triggerKey, existingTrigger);
}
-
}
@Override
@@ -225,7 +229,7 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
JobDataMap jobDataMap = trigger.getJobDataMap();
String routeIdFromTrigger = jobDataMap.getString("routeId");
if (routeIdFromTrigger != null && !routeIdFromTrigger.equals(routeId)) {
- throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already used by route" + routeIdFromTrigger + ". Can't re-use it for route " + routeId);
+ throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already used by route: " + routeIdFromTrigger + ". Cannot re-use it for another route: " + routeId);
}
}
[4/4] camel git commit: CAMEL-7809: Fixed test. This closes #1577.
Posted by da...@apache.org.
CAMEL-7809: Fixed test. This closes #1577.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f28c95c8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f28c95c8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f28c95c8
Branch: refs/heads/master
Commit: f28c95c86971346d25ec3aaf15a890667ffe27f2
Parents: 4096645
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 27 15:11:23 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 27 15:12:15 2017 +0200
----------------------------------------------------------------------
.../quartz2/QuartzScheduledPollConsumerScheduler.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f28c95c8/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 436683a..5192a56 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -167,8 +167,13 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
if (id == null) {
id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
}
- TriggerKey triggerKey = new TriggerKey(triggerId, triggerGroup);
- Trigger existingTrigger = quartzScheduler.getTrigger(triggerKey);
+
+ Trigger existingTrigger = null;
+ TriggerKey triggerKey = null;
+ if (triggerId != null && triggerGroup != null) {
+ triggerKey = new TriggerKey(triggerId, triggerGroup);
+ existingTrigger = quartzScheduler.getTrigger(triggerKey);
+ }
// Is an trigger already exist for this triggerId ?
if (existingTrigger == null) {
@@ -206,7 +211,6 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
quartzScheduler.rescheduleJob(triggerKey, existingTrigger);
}
-
}
@Override
@@ -225,7 +229,7 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
JobDataMap jobDataMap = trigger.getJobDataMap();
String routeIdFromTrigger = jobDataMap.getString("routeId");
if (routeIdFromTrigger != null && !routeIdFromTrigger.equals(routeId)) {
- throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already used by route" + routeIdFromTrigger + ". Can't re-use it for route " + routeId);
+ throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already used by route: " + routeIdFromTrigger + ". Cannot re-use it for another route: " + routeId);
}
}