You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/22 17:32:35 UTC

[3/3] git commit: CAMEL-6461: Fixed camel-quartz with stateful jobs may change endpoint uri during redeployments. Use trigger to match instead of endpoint uri is safer. Thanks to Zemian Deng for the patch.

CAMEL-6461: Fixed camel-quartz with stateful jobs may change endpoint uri during redeployments. Use trigger to match instead of endpoint uri is safer. Thanks to Zemian Deng for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff5cf8ba
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff5cf8ba
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff5cf8ba

Branch: refs/heads/camel-2.10.x
Commit: ff5cf8baf06e1bd22e43e6b1c9193a4067eba662
Parents: a9db81c
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 22 17:31:07 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 22 17:31:58 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/quartz/CamelJob.java | 26 ++++++++++++--------
 .../camel/component/quartz/QuartzComponent.java | 20 +++++++++------
 2 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ff5cf8ba/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
index fcf59cd..acbcdf0 100644
--- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
+++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
@@ -25,18 +25,17 @@ import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 import org.quartz.SchedulerContext;
 import org.quartz.SchedulerException;
+import org.quartz.Trigger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.util.URISupport.normalizeUri;
-
 /**
  * @version 
  */
 public class CamelJob implements Job, Serializable {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(CamelJob.class);
-    private static final long serialVersionUID = 26L;
+    private static final long serialVersionUID = 27L;
 
     public void execute(JobExecutionContext context) throws JobExecutionException {
         String camelContextName = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME);
@@ -54,28 +53,35 @@ public class CamelJob implements Job, Serializable {
             throw new JobExecutionException("No CamelContext could be found with name: " + camelContextName);
         }
 
-        QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext, endpointUri);
+        Trigger trigger = context.getTrigger();
+        QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext, endpointUri, trigger);
         if (endpoint == null) {
-            throw new JobExecutionException("No QuartzEndpoint could be found with uri: " + endpointUri);
+            throw new JobExecutionException("No QuartzEndpoint could be found with endpointUri: " + endpointUri);
         }
         endpoint.onJobExecute(context);
     }
 
-    private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, String endpointUri) throws JobExecutionException {
-        try {
-            String targetUri = normalizeUri(endpointUri);
+    private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, String endpointUri, Trigger trigger) throws JobExecutionException {
+        String targetTriggerName = trigger.getName();
+        String targetTriggerGroup = trigger.getGroup();
 
+        LOG.debug("Looking up existing QuartzEndpoint with trigger {}.{}", targetTriggerName, targetTriggerGroup);
+        try {
             // check all active routes for the quartz endpoint this task matches
             // as we prefer to use the existing endpoint from the routes
             for (Route route : camelContext.getRoutes()) {
                 if (route.getEndpoint() instanceof QuartzEndpoint) {
-                    if (normalizeUri(route.getEndpoint().getEndpointUri()).equals(targetUri)) {
+                    QuartzEndpoint quartzEndpoint = (QuartzEndpoint) route.getEndpoint();
+                    String triggerName = quartzEndpoint.getTrigger().getName();
+                    String triggerGroup = quartzEndpoint.getTrigger().getGroup();
+                    LOG.trace("Checking route trigger {}.{}", triggerName, triggerGroup);
+                    if (triggerName.equals(targetTriggerName) && triggerGroup.equals(targetTriggerGroup)) {
                         return (QuartzEndpoint) route.getEndpoint();
                     }
                 }
             }
         } catch (Exception e) {
-            throw new JobExecutionException("Error lookup up existing QuartzEndpoint with uri: " + endpointUri, e);
+            throw new JobExecutionException("Error lookup up existing QuartzEndpoint with trigger: " + trigger, e);
         }
 
         // fallback and lookup existing from registry (eg maybe a @Consume POJO with a quartz endpoint, and thus not from a route)

http://git-wip-us.apache.org/repos/asf/camel/blob/ff5cf8ba/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
index 901d91b..5dcc34a 100644
--- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
+++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * For a brief tutorial on setting cron expression see
  * <a href="http://www.opensymphony.com/quartz/wikidocs/CronTriggers%20Tutorial.html">Quartz cron tutorial</a>.
  *
- * @version 
+ * @version
  */
 public class QuartzComponent extends DefaultComponent implements StartupListener {
     private static final transient Logger LOG = LoggerFactory.getLogger(QuartzComponent.class);
@@ -234,19 +234,25 @@ public class QuartzComponent extends DefaultComponent implements StartupListener
             LOG.debug("Trigger: {}/{} already exists and will be updated by Quartz.", trigger.getGroup(), trigger.getName());
             // fast forward start time to now, as we do not want any misfire to kick in
             trigger.setStartTime(new Date());
-            // replace job, and relate trigger to previous job name, which is needed to reschedule job
+
+            // To ensure trigger uses the same job (the job name might change!) we will remove old trigger then re-add.
+            scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());
             scheduler.addJob(job, true);
-            trigger.setJobName(existingTrigger.getJobName());
-            scheduler.rescheduleJob(trigger.getName(), trigger.getGroup(), trigger);
+            trigger.setJobName(job.getName());
+            trigger.setJobGroup(job.getGroup());
+            scheduler.scheduleJob(trigger);
         } else {
             if (!isClustered()) {
                 LOG.debug("Trigger: {}/{} already exists and will be resumed by Quartz.", trigger.getGroup(), trigger.getName());
                 // fast forward start time to now, as we do not want any misfire to kick in
                 trigger.setStartTime(new Date());
-                // replace job, and relate trigger to previous job name, which is needed to reschedule job
+
+                // To ensure trigger uses the same job (the job name might change!) we will remove old trigger then re-add.
+                scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());
                 scheduler.addJob(job, true);
-                trigger.setJobName(existingTrigger.getJobName());
-                scheduler.rescheduleJob(trigger.getName(), trigger.getGroup(), trigger);
+                trigger.setJobName(job.getName());
+                trigger.setJobGroup(job.getGroup());
+                scheduler.scheduleJob(trigger);
             } else {
                 LOG.debug("Trigger: {}/{} already exists and is already scheduled by clustered JobStore.", trigger.getGroup(), trigger.getName());
             }