You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/22 19:44:32 UTC

svn commit: r1205124 - in /camel/trunk/components/camel-quartz/src: main/java/org/apache/camel/routepolicy/quartz/ test/java/org/apache/camel/routepolicy/quartz/

Author: davsclaus
Date: Tue Nov 22 18:44:30 2011
New Revision: 1205124

URL: http://svn.apache.org/viewvc?rev=1205124&view=rev
Log:
CAMEL-4692: Fixed issue with same quartz route policy when used with 2+ routes. Thanks to Bilgin for the patch.

Modified:
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java
    camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java Tue Nov 22 18:44:30 2011
@@ -58,46 +58,26 @@ public class CronScheduledRoutePolicy ex
             throw new IllegalArgumentException("Scheduled Route Policy for route {} has no stop/stop/suspend/resume times specified");
         }
 
-        if (scheduledRouteDetails == null) {
-            scheduledRouteDetails = new ScheduledRouteDetails();
-            scheduledRouteDetails.setRoute(route);
-
-            if (getRouteStartTime() != null) {
-                scheduleRoute(Action.START);
-            }
-            if (getRouteStopTime() != null) {
-                scheduleRoute(Action.STOP);
-            }
-
-            if (getRouteSuspendTime() != null) {
-                scheduleRoute(Action.SUSPEND);
-            }
-            if (getRouteResumeTime() != null) {
-                scheduleRoute(Action.RESUME);
-            }
+        registerRouteToScheduledRouteDetails(route);
+        if (getRouteStartTime() != null) {
+            scheduleRoute(Action.START, route);
         }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (scheduledRouteDetails.getStartJobDetail() != null) {
-            deleteRouteJob(Action.START);
-        }
-        if (scheduledRouteDetails.getStopJobDetail() != null) {
-            deleteRouteJob(Action.STOP);
+        if (getRouteStopTime() != null) {
+            scheduleRoute(Action.STOP, route);
         }
-        if (scheduledRouteDetails.getSuspendJobDetail() != null) {
-            deleteRouteJob(Action.SUSPEND);
+
+        if (getRouteSuspendTime() != null) {
+            scheduleRoute(Action.SUSPEND, route);
         }
-        if (scheduledRouteDetails.getResumeJobDetail() != null) {
-            deleteRouteJob(Action.RESUME);
+        if (getRouteResumeTime() != null) {
+            scheduleRoute(Action.RESUME, route);
         }
     }
 
     @Override
     protected Trigger createTrigger(Action action, Route route) throws Exception {
         CronTrigger trigger = null;
-        
+
         if (action == Action.START) {
             trigger = new CronTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(), getRouteStartTime());
         } else if (action == Action.STOP) {

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java Tue Nov 22 18:44:30 2011
@@ -29,7 +29,6 @@ public class ScheduledRouteDetails {
     private Trigger stopTrigger;
     private Trigger suspendTrigger;
     private Trigger resumeTrigger;
-    private Route route;
 
     public JobDetail getStartJobDetail() {
         return startJobDetail;
@@ -79,14 +78,6 @@ public class ScheduledRouteDetails {
         this.suspendTrigger = suspendTrigger;
     }
 
-    public Route getRoute() {
-        return route;
-    }
-    
-    public void setRoute(Route route) {
-        this.route = route;
-    }
-
     public void setResumeJobDetail(JobDetail resumeJobDetail) {
         this.resumeJobDetail = resumeJobDetail;
     }

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java Tue Nov 22 18:44:30 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.routepolicy.quartz;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Route;
@@ -30,10 +32,10 @@ import org.slf4j.LoggerFactory;
 
 public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements ScheduledRoutePolicyConstants {
     private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledRoutePolicy.class);
-    protected ScheduledRouteDetails scheduledRouteDetails;
+    protected Map<String, ScheduledRouteDetails> scheduledRouteDetailsMap = new LinkedHashMap<String, ScheduledRouteDetails>();
     private Scheduler scheduler;
     private int routeStopGracePeriod;
-    private TimeUnit timeUnit; 
+    private TimeUnit timeUnit;
 
     protected abstract Trigger createTrigger(Action action, Route route) throws Exception;
 
@@ -66,12 +68,10 @@ public abstract class ScheduledRoutePoli
         }       
     }
 
-    public void scheduleRoute(Action action) throws Exception {
-        Route route = scheduledRouteDetails.getRoute();
-        
+    public void scheduleRoute(Action action, Route route) throws Exception {
         JobDetail jobDetail = createJobDetail(action, route);
         Trigger trigger = createTrigger(action, route);
-        updateScheduledRouteDetails(action, jobDetail, trigger);
+        updateScheduledRouteDetails(action, jobDetail, trigger, route);
         
         loadCallbackDataIntoSchedulerContext(jobDetail, action, route);
         getScheduler().scheduleJob(jobDetail, trigger);
@@ -81,27 +81,45 @@ public abstract class ScheduledRoutePoli
         }
     }
 
-    public void pauseRouteTrigger(Action action) throws SchedulerException {
-        String triggerName = retrieveTriggerName(action);
-        String triggerGroup = retrieveTriggerGroup(action);
+    public void pauseRouteTrigger(Action action, String routeId) throws SchedulerException {
+        String triggerName = retrieveTriggerName(action, routeId);
+        String triggerGroup = retrieveTriggerGroup(action, routeId);
         
         getScheduler().pauseTrigger(triggerName, triggerGroup);
 
         LOG.debug("Scheduled trigger: {}.{} is paused", triggerGroup, triggerName);
     }
     
-    public void resumeRouteTrigger(Action action) throws SchedulerException {
-        String triggerName = retrieveTriggerName(action);
-        String triggerGroup = retrieveTriggerGroup(action);
+    public void resumeRouteTrigger(Action action, String routeId) throws SchedulerException {
+        String triggerName = retrieveTriggerName(action, routeId);
+        String triggerGroup = retrieveTriggerGroup(action, routeId);
         
         getScheduler().resumeTrigger(triggerName, triggerGroup);
 
         LOG.debug("Scheduled trigger: {}.{} is resumed", triggerGroup, triggerName);
     }
 
-    public void deleteRouteJob(Action action) throws SchedulerException {
-        String jobDetailName = retrieveJobDetailName(action);
-        String jobDetailGroup = retrieveJobDetailGroup(action);
+    @Override
+    protected void doStop() throws Exception {
+        for (ScheduledRouteDetails scheduledRouteDetails : scheduledRouteDetailsMap.values()) {
+            if (scheduledRouteDetails.getStartJobDetail() != null) {
+                deleteRouteJob(Action.START, scheduledRouteDetails);
+            }
+            if (scheduledRouteDetails.getStopJobDetail() != null) {
+                deleteRouteJob(Action.STOP, scheduledRouteDetails);
+            }
+            if (scheduledRouteDetails.getSuspendJobDetail() != null) {
+                deleteRouteJob(Action.SUSPEND, scheduledRouteDetails);
+            }
+            if (scheduledRouteDetails.getResumeJobDetail() != null) {
+                deleteRouteJob(Action.RESUME, scheduledRouteDetails);
+            }
+        }
+    }
+
+    public void deleteRouteJob(Action action, ScheduledRouteDetails scheduledRouteDetails) throws SchedulerException {
+        String jobDetailName = retrieveJobDetailName(action, scheduledRouteDetails);
+        String jobDetailGroup = retrieveJobDetailGroup(action, scheduledRouteDetails);
         
         if (!getScheduler().isShutdown()) {
             getScheduler().deleteJob(jobDetailName, jobDetailGroup);
@@ -126,7 +144,8 @@ public abstract class ScheduledRoutePoli
         return jobDetail;
     }
         
-    protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger) throws Exception {
+    protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger, Route route) throws Exception {
+        ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(route.getId());
         if (action == Action.START) {
             scheduledRouteDetails.setStartJobDetail(jobDetail);
             scheduledRouteDetails.setStartTrigger(trigger);
@@ -141,12 +160,13 @@ public abstract class ScheduledRoutePoli
             scheduledRouteDetails.setResumeTrigger(trigger);
         }
     }
-    
+
     protected void loadCallbackDataIntoSchedulerContext(JobDetail jobDetail, Action action, Route route) throws SchedulerException {
         getScheduler().getContext().put(jobDetail.getName(), new ScheduledJobState(action, route));
     }    
         
-    public String retrieveTriggerName(Action action) {
+    public String retrieveTriggerName(Action action, String routeId) {
+        ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(routeId);
         String triggerName = null;
 
         if (action == Action.START) {
@@ -162,7 +182,8 @@ public abstract class ScheduledRoutePoli
         return triggerName;
     }
 
-    public String retrieveTriggerGroup(Action action) {
+    public String retrieveTriggerGroup(Action action, String routeId) {
+        ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(routeId);
         String triggerGroup = null;
 
         if (action == Action.START) {
@@ -178,7 +199,7 @@ public abstract class ScheduledRoutePoli
         return triggerGroup;
     }
     
-    public String retrieveJobDetailName(Action action) {
+    public String retrieveJobDetailName(Action action, ScheduledRouteDetails scheduledRouteDetails) {
         String jobDetailName = null;
 
         if (action == Action.START) {
@@ -194,7 +215,7 @@ public abstract class ScheduledRoutePoli
         return jobDetailName;
     }
 
-    public String retrieveJobDetailGroup(Action action) {
+    public String retrieveJobDetailGroup(Action action, ScheduledRouteDetails scheduledRouteDetails) {
         String jobDetailGroup = null;
 
         if (action == Action.START) {
@@ -210,12 +231,13 @@ public abstract class ScheduledRoutePoli
         return jobDetailGroup;
     } 
     
-    public ScheduledRouteDetails getScheduledRouteDetails() {
-        return scheduledRouteDetails;
+    protected void registerRouteToScheduledRouteDetails(Route route) {
+        ScheduledRouteDetails scheduledRouteDetails = new ScheduledRouteDetails();
+        scheduledRouteDetailsMap.put(route.getId(), scheduledRouteDetails);
     }
 
-    public void setScheduledRouteDetails(ScheduledRouteDetails scheduledRouteDetails) {
-        this.scheduledRouteDetails = scheduledRouteDetails;
+    protected ScheduledRouteDetails getScheduledRouteDetails(String routeId) {
+        return scheduledRouteDetailsMap.get(routeId);
     }
 
     public void setScheduler(Scheduler scheduler) {

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java Tue Nov 22 18:44:30 2011
@@ -24,8 +24,6 @@ import org.apache.camel.component.quartz
 import org.apache.camel.util.ObjectHelper;
 import org.quartz.SimpleTrigger;
 import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class SimpleScheduledRoutePolicy extends ScheduledRoutePolicy {
     private Date routeStartDate;
@@ -52,7 +50,7 @@ public class SimpleScheduledRoutePolicy 
     protected void doOnInit(Route route) throws Exception {
         QuartzComponent quartz = route.getRouteContext().getCamelContext().getComponent("quartz", QuartzComponent.class);
         setScheduler(quartz.getScheduler());
-            
+
         // Important: do not start scheduler as QuartzComponent does that automatic
         // when CamelContext has been fully initialized and started
 
@@ -69,39 +67,19 @@ public class SimpleScheduledRoutePolicy 
             throw new IllegalArgumentException("Scheduled Route Policy for route {} has no stop/stop/suspend/resume times specified");
         }
 
-        if (scheduledRouteDetails == null) {
-            scheduledRouteDetails = new ScheduledRouteDetails();
-            scheduledRouteDetails.setRoute(route);
-
-            if (getRouteStartDate() != null) {
-                scheduleRoute(Action.START);
-            }
-            if (getRouteStopDate() != null) {
-                scheduleRoute(Action.STOP);
-            }
-
-            if (getRouteSuspendDate() != null) {
-                scheduleRoute(Action.SUSPEND);
-            }
-            if (getRouteResumeDate() != null) {
-                scheduleRoute(Action.RESUME);
-            }
-        }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (scheduledRouteDetails.getStartJobDetail() != null) {
-            deleteRouteJob(Action.START);
+        registerRouteToScheduledRouteDetails(route);
+        if (getRouteStartDate() != null) {
+            scheduleRoute(Action.START, route);
         }
-        if (scheduledRouteDetails.getStopJobDetail() != null) {
-            deleteRouteJob(Action.STOP);
+        if (getRouteStopDate() != null) {
+            scheduleRoute(Action.STOP, route);
         }
-        if (scheduledRouteDetails.getSuspendJobDetail() != null) {
-            deleteRouteJob(Action.SUSPEND);
+
+        if (getRouteSuspendDate() != null) {
+            scheduleRoute(Action.SUSPEND, route);
         }
-        if (scheduledRouteDetails.getResumeJobDetail() != null) {
-            deleteRouteJob(Action.RESUME);
+        if (getRouteResumeDate() != null) {
+            scheduleRoute(Action.RESUME, route);
         }
     }
 
@@ -110,16 +88,16 @@ public class SimpleScheduledRoutePolicy 
         SimpleTrigger trigger = null;
         
         if (action == Action.START) {
-            trigger = new SimpleTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(), 
+            trigger = new SimpleTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(),
                 getRouteStartDate(), null, getRouteStartRepeatCount(), getRouteStartRepeatInterval());
         } else if (action == Action.STOP) {
-            trigger = new SimpleTrigger(TRIGGER_STOP + route.getId(), TRIGGER_GROUP + route.getId(), 
+            trigger = new SimpleTrigger(TRIGGER_STOP + route.getId(), TRIGGER_GROUP + route.getId(),
                 getRouteStopDate(), null, getRouteStopRepeatCount(), getRouteStopRepeatInterval());
         } else if (action == Action.SUSPEND) {
-            trigger = new SimpleTrigger(TRIGGER_SUSPEND + route.getId(), TRIGGER_GROUP + route.getId(), 
+            trigger = new SimpleTrigger(TRIGGER_SUSPEND + route.getId(), TRIGGER_GROUP + route.getId(),
                     getRouteSuspendDate(), null, getRouteSuspendRepeatCount(), getRouteSuspendRepeatInterval());
         } else if (action == Action.RESUME) {
-            trigger = new SimpleTrigger(TRIGGER_RESUME + route.getId(), TRIGGER_GROUP + route.getId(), 
+            trigger = new SimpleTrigger(TRIGGER_RESUME + route.getId(), TRIGGER_GROUP + route.getId(),
                     getRouteResumeDate(), null, getRouteResumeRepeatCount(), getRouteResumeRepeatInterval());
         }
         

Modified: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java (original)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java Tue Nov 22 18:44:30 2011
@@ -41,6 +41,88 @@ public class CronScheduledRoutePolicyTes
     }
 
     @Test
+    public void testScheduledStartRoutePolicyWithTwoRoutes() throws Exception {
+        MockEndpoint success1 = (MockEndpoint) context.getEndpoint("mock:success1");
+        MockEndpoint success2 = (MockEndpoint) context.getEndpoint("mock:success2");
+        success1.expectedMessageCount(1);
+        success2.expectedMessageCount(1);
+
+        context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
+                policy.setRouteStartTime("*/3 * * * * ?");
+
+                from("direct:start1")
+                    .routeId("test1")
+                    .routePolicy(policy)
+                    .to("mock:success1");
+
+                from("direct:start2")
+                    .routeId("test2")
+                    .routePolicy(policy)
+                    .to("mock:success2");
+            }
+        });
+        context.start();
+        context.stopRoute("test1", 0, TimeUnit.MILLISECONDS);
+        context.stopRoute("test2", 0, TimeUnit.MILLISECONDS);
+
+        Thread.sleep(4000);
+        assertTrue(context.getRouteStatus("test1") == ServiceStatus.Started);
+        assertTrue(context.getRouteStatus("test2") == ServiceStatus.Started);
+        template.sendBody("direct:start1", "Ready or not, Here, I come");
+        template.sendBody("direct:start2", "Ready or not, Here, I come");
+
+        success1.assertIsSatisfied();
+        success2.assertIsSatisfied();
+
+        context.getComponent("quartz", QuartzComponent.class).stop();
+    }
+
+
+    @Test
+    public void testScheduledStopRoutePolicyWithTwoRoutes() throws Exception {
+        boolean consumerStopped = false;
+
+        context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
+                policy.setRouteStopTime("*/3 * * * * ?");
+                policy.setRouteStopGracePeriod(0);
+                policy.setTimeUnit(TimeUnit.MILLISECONDS);
+
+                from("direct:start1")
+                    .routeId("test1")
+                    .routePolicy(policy)
+                    .to("mock:unreachable");
+
+                from("direct:start2")
+                    .routeId("test2")
+                    .routePolicy(policy)
+                    .to("mock:unreachable");
+            }
+        });
+        context.start();
+
+        Thread.sleep(4000);
+
+        assertTrue(context.getRouteStatus("test1") == ServiceStatus.Stopped);
+        assertTrue(context.getRouteStatus("test2") == ServiceStatus.Stopped);
+
+        try {
+            template.sendBody("direct:start1", "Ready or not, Here, I come");
+            template.sendBody("direct:start2", "Ready or not, Here, I come");
+        } catch (CamelExecutionException e) {
+            consumerStopped = true;
+        }
+        assertTrue(consumerStopped);
+        context.getComponent("quartz", QuartzComponent.class).stop();
+    }
+
+    @Test
     public void testScheduledStartRoutePolicy() throws Exception {
         MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");
         success.expectedMessageCount(1);