You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/22 17:32:35 UTC
[3/3] git commit: CAMEL-6461: Fixed camel-quartz with stateful jobs
may change endpoint uri during redeployments. Use trigger to match instead of
endpoint uri is safer. Thanks to Zemian Deng for the patch.
CAMEL-6461: Fixed camel-quartz with stateful jobs may change endpoint uri during redeployments. Use trigger to match instead of endpoint uri is safer. Thanks to Zemian Deng for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff5cf8ba
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff5cf8ba
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff5cf8ba
Branch: refs/heads/camel-2.10.x
Commit: ff5cf8baf06e1bd22e43e6b1c9193a4067eba662
Parents: a9db81c
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 22 17:31:07 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 22 17:31:58 2013 +0200
----------------------------------------------------------------------
.../apache/camel/component/quartz/CamelJob.java | 26 ++++++++++++--------
.../camel/component/quartz/QuartzComponent.java | 20 +++++++++------
2 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ff5cf8ba/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
index fcf59cd..acbcdf0 100644
--- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
+++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
@@ -25,18 +25,17 @@ import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerContext;
import org.quartz.SchedulerException;
+import org.quartz.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.util.URISupport.normalizeUri;
-
/**
* @version
*/
public class CamelJob implements Job, Serializable {
private static final transient Logger LOG = LoggerFactory.getLogger(CamelJob.class);
- private static final long serialVersionUID = 26L;
+ private static final long serialVersionUID = 27L;
public void execute(JobExecutionContext context) throws JobExecutionException {
String camelContextName = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME);
@@ -54,28 +53,35 @@ public class CamelJob implements Job, Serializable {
throw new JobExecutionException("No CamelContext could be found with name: " + camelContextName);
}
- QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext, endpointUri);
+ Trigger trigger = context.getTrigger();
+ QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext, endpointUri, trigger);
if (endpoint == null) {
- throw new JobExecutionException("No QuartzEndpoint could be found with uri: " + endpointUri);
+ throw new JobExecutionException("No QuartzEndpoint could be found with endpointUri: " + endpointUri);
}
endpoint.onJobExecute(context);
}
- private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, String endpointUri) throws JobExecutionException {
- try {
- String targetUri = normalizeUri(endpointUri);
+ private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, String endpointUri, Trigger trigger) throws JobExecutionException {
+ String targetTriggerName = trigger.getName();
+ String targetTriggerGroup = trigger.getGroup();
+ LOG.debug("Looking up existing QuartzEndpoint with trigger {}.{}", targetTriggerName, targetTriggerGroup);
+ try {
// check all active routes for the quartz endpoint this task matches
// as we prefer to use the existing endpoint from the routes
for (Route route : camelContext.getRoutes()) {
if (route.getEndpoint() instanceof QuartzEndpoint) {
- if (normalizeUri(route.getEndpoint().getEndpointUri()).equals(targetUri)) {
+ QuartzEndpoint quartzEndpoint = (QuartzEndpoint) route.getEndpoint();
+ String triggerName = quartzEndpoint.getTrigger().getName();
+ String triggerGroup = quartzEndpoint.getTrigger().getGroup();
+ LOG.trace("Checking route trigger {}.{}", triggerName, triggerGroup);
+ if (triggerName.equals(targetTriggerName) && triggerGroup.equals(targetTriggerGroup)) {
return (QuartzEndpoint) route.getEndpoint();
}
}
}
} catch (Exception e) {
- throw new JobExecutionException("Error lookup up existing QuartzEndpoint with uri: " + endpointUri, e);
+ throw new JobExecutionException("Error lookup up existing QuartzEndpoint with trigger: " + trigger, e);
}
// fallback and lookup existing from registry (eg maybe a @Consume POJO with a quartz endpoint, and thus not from a route)
http://git-wip-us.apache.org/repos/asf/camel/blob/ff5cf8ba/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
index 901d91b..5dcc34a 100644
--- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
+++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
* For a brief tutorial on setting cron expression see
* <a href="http://www.opensymphony.com/quartz/wikidocs/CronTriggers%20Tutorial.html">Quartz cron tutorial</a>.
*
- * @version
+ * @version
*/
public class QuartzComponent extends DefaultComponent implements StartupListener {
private static final transient Logger LOG = LoggerFactory.getLogger(QuartzComponent.class);
@@ -234,19 +234,25 @@ public class QuartzComponent extends DefaultComponent implements StartupListener
LOG.debug("Trigger: {}/{} already exists and will be updated by Quartz.", trigger.getGroup(), trigger.getName());
// fast forward start time to now, as we do not want any misfire to kick in
trigger.setStartTime(new Date());
- // replace job, and relate trigger to previous job name, which is needed to reschedule job
+
+ // To ensure trigger uses the same job (the job name might change!) we will remove old trigger then re-add.
+ scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());
scheduler.addJob(job, true);
- trigger.setJobName(existingTrigger.getJobName());
- scheduler.rescheduleJob(trigger.getName(), trigger.getGroup(), trigger);
+ trigger.setJobName(job.getName());
+ trigger.setJobGroup(job.getGroup());
+ scheduler.scheduleJob(trigger);
} else {
if (!isClustered()) {
LOG.debug("Trigger: {}/{} already exists and will be resumed by Quartz.", trigger.getGroup(), trigger.getName());
// fast forward start time to now, as we do not want any misfire to kick in
trigger.setStartTime(new Date());
- // replace job, and relate trigger to previous job name, which is needed to reschedule job
+
+ // To ensure trigger uses the same job (the job name might change!) we will remove old trigger then re-add.
+ scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());
scheduler.addJob(job, true);
- trigger.setJobName(existingTrigger.getJobName());
- scheduler.rescheduleJob(trigger.getName(), trigger.getGroup(), trigger);
+ trigger.setJobName(job.getName());
+ trigger.setJobGroup(job.getGroup());
+ scheduler.scheduleJob(trigger);
} else {
LOG.debug("Trigger: {}/{} already exists and is already scheduled by clustered JobStore.", trigger.getGroup(), trigger.getName());
}