You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/22 19:44:32 UTC
svn commit: r1205124 - in /camel/trunk/components/camel-quartz/src:
main/java/org/apache/camel/routepolicy/quartz/
test/java/org/apache/camel/routepolicy/quartz/
Author: davsclaus
Date: Tue Nov 22 18:44:30 2011
New Revision: 1205124
URL: http://svn.apache.org/viewvc?rev=1205124&view=rev
Log:
CAMEL-4692: Fixed issue with same quartz route policy when used with 2+ routes. Thanks to Bilgin for the patch.
Modified:
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java Tue Nov 22 18:44:30 2011
@@ -58,46 +58,26 @@ public class CronScheduledRoutePolicy ex
throw new IllegalArgumentException("Scheduled Route Policy for route {} has no stop/stop/suspend/resume times specified");
}
- if (scheduledRouteDetails == null) {
- scheduledRouteDetails = new ScheduledRouteDetails();
- scheduledRouteDetails.setRoute(route);
-
- if (getRouteStartTime() != null) {
- scheduleRoute(Action.START);
- }
- if (getRouteStopTime() != null) {
- scheduleRoute(Action.STOP);
- }
-
- if (getRouteSuspendTime() != null) {
- scheduleRoute(Action.SUSPEND);
- }
- if (getRouteResumeTime() != null) {
- scheduleRoute(Action.RESUME);
- }
+ registerRouteToScheduledRouteDetails(route);
+ if (getRouteStartTime() != null) {
+ scheduleRoute(Action.START, route);
}
- }
-
- @Override
- protected void doStop() throws Exception {
- if (scheduledRouteDetails.getStartJobDetail() != null) {
- deleteRouteJob(Action.START);
- }
- if (scheduledRouteDetails.getStopJobDetail() != null) {
- deleteRouteJob(Action.STOP);
+ if (getRouteStopTime() != null) {
+ scheduleRoute(Action.STOP, route);
}
- if (scheduledRouteDetails.getSuspendJobDetail() != null) {
- deleteRouteJob(Action.SUSPEND);
+
+ if (getRouteSuspendTime() != null) {
+ scheduleRoute(Action.SUSPEND, route);
}
- if (scheduledRouteDetails.getResumeJobDetail() != null) {
- deleteRouteJob(Action.RESUME);
+ if (getRouteResumeTime() != null) {
+ scheduleRoute(Action.RESUME, route);
}
}
@Override
protected Trigger createTrigger(Action action, Route route) throws Exception {
CronTrigger trigger = null;
-
+
if (action == Action.START) {
trigger = new CronTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(), getRouteStartTime());
} else if (action == Action.STOP) {
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java Tue Nov 22 18:44:30 2011
@@ -29,7 +29,6 @@ public class ScheduledRouteDetails {
private Trigger stopTrigger;
private Trigger suspendTrigger;
private Trigger resumeTrigger;
- private Route route;
public JobDetail getStartJobDetail() {
return startJobDetail;
@@ -79,14 +78,6 @@ public class ScheduledRouteDetails {
this.suspendTrigger = suspendTrigger;
}
- public Route getRoute() {
- return route;
- }
-
- public void setRoute(Route route) {
- this.route = route;
- }
-
public void setResumeJobDetail(JobDetail resumeJobDetail) {
this.resumeJobDetail = resumeJobDetail;
}
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java Tue Nov 22 18:44:30 2011
@@ -16,6 +16,8 @@
*/
package org.apache.camel.routepolicy.quartz;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Route;
@@ -30,10 +32,10 @@ import org.slf4j.LoggerFactory;
public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements ScheduledRoutePolicyConstants {
private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledRoutePolicy.class);
- protected ScheduledRouteDetails scheduledRouteDetails;
+ protected Map<String, ScheduledRouteDetails> scheduledRouteDetailsMap = new LinkedHashMap<String, ScheduledRouteDetails>();
private Scheduler scheduler;
private int routeStopGracePeriod;
- private TimeUnit timeUnit;
+ private TimeUnit timeUnit;
protected abstract Trigger createTrigger(Action action, Route route) throws Exception;
@@ -66,12 +68,10 @@ public abstract class ScheduledRoutePoli
}
}
- public void scheduleRoute(Action action) throws Exception {
- Route route = scheduledRouteDetails.getRoute();
-
+ public void scheduleRoute(Action action, Route route) throws Exception {
JobDetail jobDetail = createJobDetail(action, route);
Trigger trigger = createTrigger(action, route);
- updateScheduledRouteDetails(action, jobDetail, trigger);
+ updateScheduledRouteDetails(action, jobDetail, trigger, route);
loadCallbackDataIntoSchedulerContext(jobDetail, action, route);
getScheduler().scheduleJob(jobDetail, trigger);
@@ -81,27 +81,45 @@ public abstract class ScheduledRoutePoli
}
}
- public void pauseRouteTrigger(Action action) throws SchedulerException {
- String triggerName = retrieveTriggerName(action);
- String triggerGroup = retrieveTriggerGroup(action);
+ public void pauseRouteTrigger(Action action, String routeId) throws SchedulerException {
+ String triggerName = retrieveTriggerName(action, routeId);
+ String triggerGroup = retrieveTriggerGroup(action, routeId);
getScheduler().pauseTrigger(triggerName, triggerGroup);
LOG.debug("Scheduled trigger: {}.{} is paused", triggerGroup, triggerName);
}
- public void resumeRouteTrigger(Action action) throws SchedulerException {
- String triggerName = retrieveTriggerName(action);
- String triggerGroup = retrieveTriggerGroup(action);
+ public void resumeRouteTrigger(Action action, String routeId) throws SchedulerException {
+ String triggerName = retrieveTriggerName(action, routeId);
+ String triggerGroup = retrieveTriggerGroup(action, routeId);
getScheduler().resumeTrigger(triggerName, triggerGroup);
LOG.debug("Scheduled trigger: {}.{} is resumed", triggerGroup, triggerName);
}
- public void deleteRouteJob(Action action) throws SchedulerException {
- String jobDetailName = retrieveJobDetailName(action);
- String jobDetailGroup = retrieveJobDetailGroup(action);
+ @Override
+ protected void doStop() throws Exception {
+ for (ScheduledRouteDetails scheduledRouteDetails : scheduledRouteDetailsMap.values()) {
+ if (scheduledRouteDetails.getStartJobDetail() != null) {
+ deleteRouteJob(Action.START, scheduledRouteDetails);
+ }
+ if (scheduledRouteDetails.getStopJobDetail() != null) {
+ deleteRouteJob(Action.STOP, scheduledRouteDetails);
+ }
+ if (scheduledRouteDetails.getSuspendJobDetail() != null) {
+ deleteRouteJob(Action.SUSPEND, scheduledRouteDetails);
+ }
+ if (scheduledRouteDetails.getResumeJobDetail() != null) {
+ deleteRouteJob(Action.RESUME, scheduledRouteDetails);
+ }
+ }
+ }
+
+ public void deleteRouteJob(Action action, ScheduledRouteDetails scheduledRouteDetails) throws SchedulerException {
+ String jobDetailName = retrieveJobDetailName(action, scheduledRouteDetails);
+ String jobDetailGroup = retrieveJobDetailGroup(action, scheduledRouteDetails);
if (!getScheduler().isShutdown()) {
getScheduler().deleteJob(jobDetailName, jobDetailGroup);
@@ -126,7 +144,8 @@ public abstract class ScheduledRoutePoli
return jobDetail;
}
- protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger) throws Exception {
+ protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger, Route route) throws Exception {
+ ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(route.getId());
if (action == Action.START) {
scheduledRouteDetails.setStartJobDetail(jobDetail);
scheduledRouteDetails.setStartTrigger(trigger);
@@ -141,12 +160,13 @@ public abstract class ScheduledRoutePoli
scheduledRouteDetails.setResumeTrigger(trigger);
}
}
-
+
protected void loadCallbackDataIntoSchedulerContext(JobDetail jobDetail, Action action, Route route) throws SchedulerException {
getScheduler().getContext().put(jobDetail.getName(), new ScheduledJobState(action, route));
}
- public String retrieveTriggerName(Action action) {
+ public String retrieveTriggerName(Action action, String routeId) {
+ ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(routeId);
String triggerName = null;
if (action == Action.START) {
@@ -162,7 +182,8 @@ public abstract class ScheduledRoutePoli
return triggerName;
}
- public String retrieveTriggerGroup(Action action) {
+ public String retrieveTriggerGroup(Action action, String routeId) {
+ ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(routeId);
String triggerGroup = null;
if (action == Action.START) {
@@ -178,7 +199,7 @@ public abstract class ScheduledRoutePoli
return triggerGroup;
}
- public String retrieveJobDetailName(Action action) {
+ public String retrieveJobDetailName(Action action, ScheduledRouteDetails scheduledRouteDetails) {
String jobDetailName = null;
if (action == Action.START) {
@@ -194,7 +215,7 @@ public abstract class ScheduledRoutePoli
return jobDetailName;
}
- public String retrieveJobDetailGroup(Action action) {
+ public String retrieveJobDetailGroup(Action action, ScheduledRouteDetails scheduledRouteDetails) {
String jobDetailGroup = null;
if (action == Action.START) {
@@ -210,12 +231,13 @@ public abstract class ScheduledRoutePoli
return jobDetailGroup;
}
- public ScheduledRouteDetails getScheduledRouteDetails() {
- return scheduledRouteDetails;
+ protected void registerRouteToScheduledRouteDetails(Route route) {
+ ScheduledRouteDetails scheduledRouteDetails = new ScheduledRouteDetails();
+ scheduledRouteDetailsMap.put(route.getId(), scheduledRouteDetails);
}
- public void setScheduledRouteDetails(ScheduledRouteDetails scheduledRouteDetails) {
- this.scheduledRouteDetails = scheduledRouteDetails;
+ protected ScheduledRouteDetails getScheduledRouteDetails(String routeId) {
+ return scheduledRouteDetailsMap.get(routeId);
}
public void setScheduler(Scheduler scheduler) {
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java Tue Nov 22 18:44:30 2011
@@ -24,8 +24,6 @@ import org.apache.camel.component.quartz
import org.apache.camel.util.ObjectHelper;
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SimpleScheduledRoutePolicy extends ScheduledRoutePolicy {
private Date routeStartDate;
@@ -52,7 +50,7 @@ public class SimpleScheduledRoutePolicy
protected void doOnInit(Route route) throws Exception {
QuartzComponent quartz = route.getRouteContext().getCamelContext().getComponent("quartz", QuartzComponent.class);
setScheduler(quartz.getScheduler());
-
+
// Important: do not start scheduler as QuartzComponent does that automatic
// when CamelContext has been fully initialized and started
@@ -69,39 +67,19 @@ public class SimpleScheduledRoutePolicy
throw new IllegalArgumentException("Scheduled Route Policy for route {} has no stop/stop/suspend/resume times specified");
}
- if (scheduledRouteDetails == null) {
- scheduledRouteDetails = new ScheduledRouteDetails();
- scheduledRouteDetails.setRoute(route);
-
- if (getRouteStartDate() != null) {
- scheduleRoute(Action.START);
- }
- if (getRouteStopDate() != null) {
- scheduleRoute(Action.STOP);
- }
-
- if (getRouteSuspendDate() != null) {
- scheduleRoute(Action.SUSPEND);
- }
- if (getRouteResumeDate() != null) {
- scheduleRoute(Action.RESUME);
- }
- }
- }
-
- @Override
- protected void doStop() throws Exception {
- if (scheduledRouteDetails.getStartJobDetail() != null) {
- deleteRouteJob(Action.START);
+ registerRouteToScheduledRouteDetails(route);
+ if (getRouteStartDate() != null) {
+ scheduleRoute(Action.START, route);
}
- if (scheduledRouteDetails.getStopJobDetail() != null) {
- deleteRouteJob(Action.STOP);
+ if (getRouteStopDate() != null) {
+ scheduleRoute(Action.STOP, route);
}
- if (scheduledRouteDetails.getSuspendJobDetail() != null) {
- deleteRouteJob(Action.SUSPEND);
+
+ if (getRouteSuspendDate() != null) {
+ scheduleRoute(Action.SUSPEND, route);
}
- if (scheduledRouteDetails.getResumeJobDetail() != null) {
- deleteRouteJob(Action.RESUME);
+ if (getRouteResumeDate() != null) {
+ scheduleRoute(Action.RESUME, route);
}
}
@@ -110,16 +88,16 @@ public class SimpleScheduledRoutePolicy
SimpleTrigger trigger = null;
if (action == Action.START) {
- trigger = new SimpleTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(),
+ trigger = new SimpleTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(),
getRouteStartDate(), null, getRouteStartRepeatCount(), getRouteStartRepeatInterval());
} else if (action == Action.STOP) {
- trigger = new SimpleTrigger(TRIGGER_STOP + route.getId(), TRIGGER_GROUP + route.getId(),
+ trigger = new SimpleTrigger(TRIGGER_STOP + route.getId(), TRIGGER_GROUP + route.getId(),
getRouteStopDate(), null, getRouteStopRepeatCount(), getRouteStopRepeatInterval());
} else if (action == Action.SUSPEND) {
- trigger = new SimpleTrigger(TRIGGER_SUSPEND + route.getId(), TRIGGER_GROUP + route.getId(),
+ trigger = new SimpleTrigger(TRIGGER_SUSPEND + route.getId(), TRIGGER_GROUP + route.getId(),
getRouteSuspendDate(), null, getRouteSuspendRepeatCount(), getRouteSuspendRepeatInterval());
} else if (action == Action.RESUME) {
- trigger = new SimpleTrigger(TRIGGER_RESUME + route.getId(), TRIGGER_GROUP + route.getId(),
+ trigger = new SimpleTrigger(TRIGGER_RESUME + route.getId(), TRIGGER_GROUP + route.getId(),
getRouteResumeDate(), null, getRouteResumeRepeatCount(), getRouteResumeRepeatInterval());
}
Modified: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java?rev=1205124&r1=1205123&r2=1205124&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java (original)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java Tue Nov 22 18:44:30 2011
@@ -41,6 +41,88 @@ public class CronScheduledRoutePolicyTes
}
@Test
+ public void testScheduledStartRoutePolicyWithTwoRoutes() throws Exception {
+ MockEndpoint success1 = (MockEndpoint) context.getEndpoint("mock:success1");
+ MockEndpoint success2 = (MockEndpoint) context.getEndpoint("mock:success2");
+ success1.expectedMessageCount(1);
+ success2.expectedMessageCount(1);
+
+ context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
+ policy.setRouteStartTime("*/3 * * * * ?");
+
+ from("direct:start1")
+ .routeId("test1")
+ .routePolicy(policy)
+ .to("mock:success1");
+
+ from("direct:start2")
+ .routeId("test2")
+ .routePolicy(policy)
+ .to("mock:success2");
+ }
+ });
+ context.start();
+ context.stopRoute("test1", 0, TimeUnit.MILLISECONDS);
+ context.stopRoute("test2", 0, TimeUnit.MILLISECONDS);
+
+ Thread.sleep(4000);
+ assertTrue(context.getRouteStatus("test1") == ServiceStatus.Started);
+ assertTrue(context.getRouteStatus("test2") == ServiceStatus.Started);
+ template.sendBody("direct:start1", "Ready or not, Here, I come");
+ template.sendBody("direct:start2", "Ready or not, Here, I come");
+
+ success1.assertIsSatisfied();
+ success2.assertIsSatisfied();
+
+ context.getComponent("quartz", QuartzComponent.class).stop();
+ }
+
+
+ @Test
+ public void testScheduledStopRoutePolicyWithTwoRoutes() throws Exception {
+ boolean consumerStopped = false;
+
+ context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
+ policy.setRouteStopTime("*/3 * * * * ?");
+ policy.setRouteStopGracePeriod(0);
+ policy.setTimeUnit(TimeUnit.MILLISECONDS);
+
+ from("direct:start1")
+ .routeId("test1")
+ .routePolicy(policy)
+ .to("mock:unreachable");
+
+ from("direct:start2")
+ .routeId("test2")
+ .routePolicy(policy)
+ .to("mock:unreachable");
+ }
+ });
+ context.start();
+
+ Thread.sleep(4000);
+
+ assertTrue(context.getRouteStatus("test1") == ServiceStatus.Stopped);
+ assertTrue(context.getRouteStatus("test2") == ServiceStatus.Stopped);
+
+ try {
+ template.sendBody("direct:start1", "Ready or not, Here, I come");
+ template.sendBody("direct:start2", "Ready or not, Here, I come");
+ } catch (CamelExecutionException e) {
+ consumerStopped = true;
+ }
+ assertTrue(consumerStopped);
+ context.getComponent("quartz", QuartzComponent.class).stop();
+ }
+
+ @Test
public void testScheduledStartRoutePolicy() throws Exception {
MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");
success.expectedMessageCount(1);