You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ak...@apache.org on 2010/09/24 20:55:31 UTC
svn commit: r1001028 - in /camel/trunk/components/camel-quartz/src:
main/java/org/apache/camel/routepolicy/
main/java/org/apache/camel/routepolicy/quartz/
test/java/org/apache/camel/routepolicy/
test/java/org/apache/camel/routepolicy/quartz/
Author: akarpe
Date: Fri Sep 24 18:55:30 2010
New Revision: 1001028
URL: http://svn.apache.org/viewvc?rev=1001028&view=rev
Log:
CAMEL-2936 Provided a ScheduledRoutePolicy to make it easy to define when routes should be active, stopped or suspended
Added:
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java (with props)
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledJob.java (with props)
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java (with props)
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java (with props)
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicyConstants.java (with props)
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java (with props)
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java (with props)
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicyTest.java (with props)
Added: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java (added)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+import java.util.Properties;
+
+import org.apache.camel.Route;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.quartz.CronTrigger;
+import org.quartz.Trigger;
+
+public class CronScheduledRoutePolicy extends ScheduledRoutePolicy implements ScheduledRoutePolicyConstants {
+ private static final transient Log LOG = LogFactory.getLog(CronScheduledRoutePolicy.class);
+ private String routeStartTime;
+ private String routeStopTime;
+ private String routeSuspendTime;
+ private String routeResumeTime;
+
+ public CronScheduledRoutePolicy() {
+ super();
+ }
+
+ public CronScheduledRoutePolicy(String propertiesFile) {
+ super(propertiesFile);
+ }
+
+ public CronScheduledRoutePolicy(Properties properties) {
+ super(properties);
+ }
+
+ public void onInit(Route route) {
+ try {
+ if ((getRouteStartTime() == null) && (getRouteStopTime() == null) && (getRouteSuspendTime() == null) && (getRouteResumeTime() == null)) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Scheduled Route Policy for route " + route.getId() + " is not set since the no start, stop and/or suspend times are specified");
+ }
+ return;
+ }
+
+ if (scheduledRouteDetails == null) {
+ scheduledRouteDetails = new ScheduledRouteDetails();
+ scheduledRouteDetails.setRoute(route);
+
+ if (getRouteStartTime() != null) {
+ scheduleRoute(Action.START);
+ }
+
+ if (getRouteStopTime() != null) {
+ scheduleRoute(Action.STOP);
+ }
+
+ if (getRouteSuspendTime() != null) {
+ scheduleRoute(Action.SUSPEND);
+ }
+ if (getRouteResumeTime() != null) {
+ scheduleRoute(Action.RESUME);
+ }
+ }
+
+ getScheduler().start();
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (scheduledRouteDetails.getStartJobDetail() != null) {
+ deleteRouteJob(Action.START);
+ }
+ if (scheduledRouteDetails.getStopJobDetail() != null) {
+ deleteRouteJob(Action.STOP);
+ }
+ if (scheduledRouteDetails.getSuspendJobDetail() != null) {
+ deleteRouteJob(Action.SUSPEND);
+ }
+ if (scheduledRouteDetails.getResumeJobDetail() != null) {
+ deleteRouteJob(Action.RESUME);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.routepolicy.quartz.ScheduledRoutePolicy#createTrigger(org.apache.camel.routepolicy.quartz.ScheduledRoutePolicyConstants.Action)
+ */
+ @Override
+ protected Trigger createTrigger(Action action, Route route) throws Exception {
+ CronTrigger trigger = null;
+
+ if (action == Action.START) {
+ trigger = new CronTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(), getRouteStartTime());
+ } else if (action == Action.STOP) {
+ trigger = new CronTrigger(TRIGGER_STOP + route.getId(), TRIGGER_GROUP + route.getId(), getRouteStopTime());
+ } else if (action == Action.SUSPEND) {
+ trigger = new CronTrigger(TRIGGER_SUSPEND + route.getId(), TRIGGER_GROUP + route.getId(), getRouteSuspendTime());
+ } else if (action == Action.RESUME) {
+ trigger = new CronTrigger(TRIGGER_RESUME + route.getId(), TRIGGER_GROUP + route.getId(), getRouteResumeTime());
+ }
+
+ return trigger;
+ }
+
+ public void setRouteStartTime(String routeStartTime) {
+ this.routeStartTime = routeStartTime;
+ }
+
+ public String getRouteStartTime() {
+ return routeStartTime;
+ }
+
+ public void setRouteStopTime(String routeStopTime) {
+ this.routeStopTime = routeStopTime;
+ }
+
+ public String getRouteStopTime() {
+ return routeStopTime;
+ }
+
+ public void setRouteSuspendTime(String routeSuspendTime) {
+ this.routeSuspendTime = routeSuspendTime;
+ }
+
+ public String getRouteSuspendTime() {
+ return routeSuspendTime;
+ }
+
+ public void setRouteResumeTime(String routeResumeTime) {
+ this.routeResumeTime = routeResumeTime;
+ }
+
+ public String getRouteResumeTime() {
+ return routeResumeTime;
+ }
+
+}
Propchange: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledJob.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledJob.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledJob.java (added)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledJob.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+import java.io.Serializable;
+
+import org.apache.camel.Route;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerContext;
+import org.quartz.SchedulerException;
+
+public class ScheduledJob implements Job, Serializable, ScheduledRoutePolicyConstants {
+ private static final long serialVersionUID = 26L;
+ private Route storedRoute;
+
+ /* (non-Javadoc)
+ * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+ */
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+
+ SchedulerContext schedulerContext;
+ try {
+ schedulerContext = jobExecutionContext.getScheduler().getContext();
+ } catch (SchedulerException e) {
+ throw new JobExecutionException("Failed to obtain scheduler context for job " + jobExecutionContext.getJobDetail().getName());
+ }
+
+ Action storedAction = (Action) schedulerContext.get(SCHEDULED_ACTION);
+ storedRoute = (Route) schedulerContext.get(SCHEDULED_ROUTE);
+
+ ScheduledRoutePolicy policy = (ScheduledRoutePolicy) storedRoute.getRouteContext().getRoutePolicy();
+ try {
+ policy.onJobExecute(storedAction, storedRoute);
+ } catch (Exception e) {
+ throw new JobExecutionException("Failed to execute Scheduled Job for route " + storedRoute.getId() + " with trigger name: " + jobExecutionContext.getTrigger().getFullName());
+ }
+ }
+
+
+}
Propchange: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java (added)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+import org.apache.camel.Route;
+import org.quartz.JobDetail;
+import org.quartz.Trigger;
+
+public class ScheduledRouteDetails {
+ private JobDetail startJobDetail;
+ private JobDetail stopJobDetail;
+ private JobDetail suspendJobDetail;
+ private JobDetail resumeJobDetail;
+ private Trigger startTrigger;
+ private Trigger stopTrigger;
+ private Trigger suspendTrigger;
+ private Trigger resumeTrigger;
+ private Route route;
+
+ public JobDetail getStartJobDetail() {
+ return startJobDetail;
+ }
+
+ public void setStartJobDetail(JobDetail startJobDetail) {
+ this.startJobDetail = startJobDetail;
+ }
+
+ public JobDetail getStopJobDetail() {
+ return stopJobDetail;
+ }
+
+ public void setStopJobDetail(JobDetail stopJobDetail) {
+ this.stopJobDetail = stopJobDetail;
+ }
+
+ public JobDetail getSuspendJobDetail() {
+ return suspendJobDetail;
+ }
+
+ public void setSuspendJobDetail(JobDetail suspendJobDetail) {
+ this.suspendJobDetail = suspendJobDetail;
+ }
+
+ public Trigger getStartTrigger() {
+ return startTrigger;
+ }
+
+ public void setStartTrigger(Trigger startTrigger) {
+ this.startTrigger = startTrigger;
+ }
+
+ public Trigger getStopTrigger() {
+ return stopTrigger;
+ }
+
+ public void setStopTrigger(Trigger stopTrigger) {
+ this.stopTrigger = stopTrigger;
+ }
+
+ public Trigger getSuspendTrigger() {
+ return suspendTrigger;
+ }
+
+ public void setSuspendTrigger(Trigger suspendTrigger) {
+ this.suspendTrigger = suspendTrigger;
+ }
+
+ public Route getRoute() {
+ return route;
+ }
+
+ public void setRoute(Route route) {
+ this.route = route;
+ }
+
+ public void setResumeJobDetail(JobDetail resumeJobDetail) {
+ this.resumeJobDetail = resumeJobDetail;
+ }
+
+ public JobDetail getResumeJobDetail() {
+ return resumeJobDetail;
+ }
+
+ public void setResumeTrigger(Trigger resumeTrigger) {
+ this.resumeTrigger = resumeTrigger;
+ }
+
+ public Trigger getResumeTrigger() {
+ return resumeTrigger;
+ }
+
+}
Propchange: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRouteDetails.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java (added)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Route;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.impl.RoutePolicySupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.StdSchedulerFactory;
+
+public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements ScheduledRoutePolicyConstants {
+ private static final transient Log LOG = LogFactory.getLog(ScheduledRoutePolicy.class);
+ protected ScheduledRouteDetails scheduledRouteDetails;
+ private Properties properties;
+ private StdSchedulerFactory schedulerFactory;
+ private Scheduler scheduler;
+ private int routeStopGracePeriod;
+ private TimeUnit timeUnit;
+
+ public ScheduledRoutePolicy() {
+ this(new Properties());
+ }
+
+ public ScheduledRoutePolicy(String propertiesFile) {
+ try {
+ properties = new Properties();
+ properties.load(ObjectHelper.loadResourceAsStream(propertiesFile));
+ schedulerFactory = new StdSchedulerFactory(properties);
+ setScheduler(schedulerFactory.getScheduler());
+ } catch (Exception e) {
+ handleException(e);
+ }
+ setRouteStopGracePeriod(10000);
+ setTimeUnit(TimeUnit.MILLISECONDS);
+ }
+
+ public ScheduledRoutePolicy(Properties properties) {
+ try {
+ if (properties.isEmpty()) {
+ schedulerFactory = new StdSchedulerFactory();
+ } else {
+ schedulerFactory = new StdSchedulerFactory(properties);
+ }
+ setScheduler(schedulerFactory.getScheduler());
+ } catch (Exception e) {
+ handleException(e);
+ }
+ setRouteStopGracePeriod(10000);
+ setTimeUnit(TimeUnit.MILLISECONDS);
+ }
+
+ protected abstract Trigger createTrigger(Action action, Route route) throws Exception;
+
+ protected void onJobExecute(Action action, Route route) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled Event notification received. Performing requested operation " + action + " for route " + route.getId());
+ }
+ ServiceStatus routeStatus = route.getRouteContext().getCamelContext().getRouteStatus(route.getId());
+ if (action == Action.START) {
+ if (routeStatus == ServiceStatus.Stopped) {
+ startRoute(route);
+ } else if (routeStatus == ServiceStatus.Suspended) {
+ startConsumer(route.getConsumer());
+ }
+ } else if (action == Action.STOP) {
+ if ((routeStatus == ServiceStatus.Started) || (routeStatus == ServiceStatus.Suspended)) {
+ stopRoute(route, getRouteStopGracePeriod(), getTimeUnit());
+ }
+ } else if (action == Action.SUSPEND) {
+ if (routeStatus == ServiceStatus.Started) {
+ stopConsumer(route.getConsumer());
+ } else {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Route is not in a started state and cannot be suspended. The current route state is " + routeStatus);
+ }
+ }
+ } else if (action == Action.RESUME) {
+ if (routeStatus == ServiceStatus.Started) {
+ startConsumer(route.getConsumer());
+ } else {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Route is not in a started state and cannot be resumed. The current route state is " + routeStatus);
+ }
+ }
+ }
+ }
+
+ public void scheduleRoute(Action action) throws Exception {
+ Route route = scheduledRouteDetails.getRoute();
+
+ JobDetail jobDetail = createJobDetail(action, route);
+ Trigger trigger = createTrigger(action, route);
+ updateScheduledRouteDetails(action, jobDetail, trigger);
+
+ loadCallbackDataIntoSchedulerContext(action, route);
+ getScheduler().scheduleJob(jobDetail, trigger);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled Trigger: " + trigger.getFullName() + " is operational");
+ }
+ }
+
+ public void pauseRouteTrigger(Action action) throws SchedulerException {
+ String triggerName = retrieveTriggerName(action);
+ String triggerGroup = retrieveTriggerGroup(action);
+
+ getScheduler().pauseTrigger(triggerName, triggerGroup);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled Trigger: " + triggerGroup + "." + triggerName + " is paused");
+ }
+ }
+
+ public void resumeRouteTrigger(Action action) throws SchedulerException {
+ String triggerName = retrieveTriggerName(action);
+ String triggerGroup = retrieveTriggerGroup(action);
+
+ getScheduler().resumeTrigger(triggerName, triggerGroup);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled Trigger: " + triggerGroup + "." + triggerName + " has been resumed");
+ }
+ }
+
+ public void deleteRouteJob(Action action) throws SchedulerException {
+ String jobDetailName = retrieveJobDetailName(action);
+ String jobDetailGroup = retrieveJobDetailGroup(action);
+
+ getScheduler().deleteJob(jobDetailName, jobDetailGroup);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled Job: " + jobDetailGroup + "." + jobDetailName + " has been deleted");
+ }
+ }
+
+ protected JobDetail createJobDetail(Action action, Route route) throws Exception {
+ JobDetail jobDetail = null;
+
+ if (action == Action.START) {
+ jobDetail = new JobDetail(JOB_START + route.getId(), JOB_GROUP + route.getId(), ScheduledJob.class);
+ } else if (action == Action.STOP) {
+ jobDetail = new JobDetail(JOB_STOP + route.getId(), JOB_GROUP + route.getId(), ScheduledJob.class);
+ } else if (action == Action.SUSPEND) {
+ jobDetail = new JobDetail(JOB_SUSPEND + route.getId(), JOB_GROUP + route.getId(), ScheduledJob.class);
+ } else if (action == Action.RESUME) {
+ jobDetail = new JobDetail(JOB_RESUME + route.getId(), JOB_GROUP + route.getId(), ScheduledJob.class);
+ }
+
+ return jobDetail;
+ }
+
+ protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger) throws Exception {
+ if (action == Action.START) {
+ scheduledRouteDetails.setStartJobDetail(jobDetail);
+ scheduledRouteDetails.setStartTrigger(trigger);
+ } else if (action == Action.STOP) {
+ scheduledRouteDetails.setStopJobDetail(jobDetail);
+ scheduledRouteDetails.setStopTrigger(trigger);
+ } else if (action == Action.SUSPEND) {
+ scheduledRouteDetails.setSuspendJobDetail(jobDetail);
+ scheduledRouteDetails.setSuspendTrigger(trigger);
+ } else if (action == Action.RESUME) {
+ scheduledRouteDetails.setResumeJobDetail(jobDetail);
+ scheduledRouteDetails.setResumeTrigger(trigger);
+ }
+ }
+
+ protected void loadCallbackDataIntoSchedulerContext(Action action, Route route) throws SchedulerException {
+ getScheduler().getContext().put(SCHEDULED_ACTION, action);
+ getScheduler().getContext().put(SCHEDULED_ROUTE, route);
+ }
+
+ public String retrieveTriggerName(Action action) {
+ String triggerName = null;
+
+ if (action == Action.START) {
+ triggerName = scheduledRouteDetails.getStartTrigger().getName();
+ } else if (action == Action.STOP) {
+ triggerName = scheduledRouteDetails.getStopTrigger().getName();
+ } else if (action == Action.SUSPEND) {
+ triggerName = scheduledRouteDetails.getSuspendTrigger().getName();
+ } else if (action == Action.RESUME) {
+ triggerName = scheduledRouteDetails.getResumeTrigger().getName();
+ }
+
+ return triggerName;
+ }
+
+ public String retrieveTriggerGroup(Action action) {
+ String triggerGroup = null;
+
+ if (action == Action.START) {
+ triggerGroup = scheduledRouteDetails.getStartTrigger().getGroup();
+ } else if (action == Action.STOP) {
+ triggerGroup = scheduledRouteDetails.getStopTrigger().getGroup();
+ } else if (action == Action.SUSPEND) {
+ triggerGroup = scheduledRouteDetails.getSuspendTrigger().getGroup();
+ } else if (action == Action.RESUME) {
+ triggerGroup = scheduledRouteDetails.getResumeTrigger().getGroup();
+ }
+
+ return triggerGroup;
+ }
+
+ public String retrieveJobDetailName(Action action) {
+ String jobDetailName = null;
+
+ if (action == Action.START) {
+ jobDetailName = scheduledRouteDetails.getStartJobDetail().getName();
+ } else if (action == Action.STOP) {
+ jobDetailName = scheduledRouteDetails.getStopJobDetail().getName();
+ } else if (action == Action.SUSPEND) {
+ jobDetailName = scheduledRouteDetails.getSuspendJobDetail().getName();
+ } else if (action == Action.RESUME) {
+ jobDetailName = scheduledRouteDetails.getResumeJobDetail().getName();
+ }
+
+ return jobDetailName;
+ }
+
+ public String retrieveJobDetailGroup(Action action) {
+ String jobDetailGroup = null;
+
+ if (action == Action.START) {
+ jobDetailGroup = scheduledRouteDetails.getStartJobDetail().getGroup();
+ } else if (action == Action.STOP) {
+ jobDetailGroup = scheduledRouteDetails.getStopJobDetail().getGroup();
+ } else if (action == Action.SUSPEND) {
+ jobDetailGroup = scheduledRouteDetails.getSuspendJobDetail().getGroup();
+ } else if (action == Action.RESUME) {
+ jobDetailGroup = scheduledRouteDetails.getResumeJobDetail().getGroup();
+ }
+
+ return jobDetailGroup;
+ }
+
+ public ScheduledRouteDetails getScheduledRouteDetails() {
+ return scheduledRouteDetails;
+ }
+
+ public void setScheduledRouteDetails(ScheduledRouteDetails scheduledRouteDetails) {
+ this.scheduledRouteDetails = scheduledRouteDetails;
+ }
+
+ public void setScheduler(Scheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public Scheduler getScheduler() {
+ return scheduler;
+ }
+
+ public void setRouteStopGracePeriod(int routeStopGracePeriod) {
+ this.routeStopGracePeriod = routeStopGracePeriod;
+ }
+
+ public int getRouteStopGracePeriod() {
+ return routeStopGracePeriod;
+ }
+
+ public void setTimeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+}
Propchange: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicyConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicyConstants.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicyConstants.java (added)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicyConstants.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+/**
+ * Quartz constants.
+ */
+public interface ScheduledRoutePolicyConstants {
+ enum Action {
+ START, STOP, SUSPEND, RESUME
+ };
+
+ String SCHEDULED_ROUTE = "ScheduledRoute";
+ String SCHEDULED_TRIGGER = "ScheduledTrigger";
+ String SCHEDULED_ACTION = "ScheduledAction";
+ String JOB_START = "job-" + Action.START + "-";
+ String JOB_STOP = "job-" + Action.STOP + "-";
+ String JOB_SUSPEND = "job-" + Action.SUSPEND + "-";
+ String JOB_RESUME = "job-" + Action.RESUME + "-";
+ String JOB_GROUP = "jobGroup-";
+ String TRIGGER_START = "trigger-" + Action.START + "-";
+ String TRIGGER_STOP = "trigger-" + Action.STOP + "-";
+ String TRIGGER_SUSPEND = "trigger-" + Action.SUSPEND + "-";
+ String TRIGGER_RESUME = "trigger-" + Action.RESUME + "-";
+ String TRIGGER_GROUP = "triggerGroup-";
+
+}
Propchange: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicyConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java (added)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.camel.Route;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.quartz.SimpleTrigger;
+import org.quartz.Trigger;
+
+public class SimpleScheduledRoutePolicy extends ScheduledRoutePolicy {
+ private static final transient Log LOG = LogFactory.getLog(CronScheduledRoutePolicy.class);
+ private Date routeStartDate;
+ private int routeStartRepeatCount;
+ private long routeStartRepeatInterval;
+ private Date routeStopDate;
+ private int routeStopRepeatCount;
+ private long routeStopRepeatInterval;
+ private Date routeSuspendDate;
+ private int routeSuspendRepeatCount;
+ private long routeSuspendRepeatInterval;
+ private Date routeResumeDate;
+ private int routeResumeRepeatCount;
+ private long routeResumeRepeatInterval;
+
+ public SimpleScheduledRoutePolicy() {
+ super();
+ }
+
+ public SimpleScheduledRoutePolicy(String propertiesFile) {
+ super(propertiesFile);
+ }
+
+ public SimpleScheduledRoutePolicy(Properties properties) {
+ super(properties);
+ }
+
+ public void onInit(Route route) {
+ try {
+ if ((getRouteStartDate() == null) && (getRouteStopDate() == null) && (getRouteSuspendDate() == null) && (getRouteResumeDate() == null)) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Scheduled Route Policy for route " + route.getId() + " is not set since the no start, stop and/or suspend times are specified");
+ }
+ return;
+ }
+
+ if (scheduledRouteDetails == null) {
+ scheduledRouteDetails = new ScheduledRouteDetails();
+ scheduledRouteDetails.setRoute(route);
+
+ if (getRouteStartDate() != null) {
+ scheduleRoute(Action.START);
+ }
+
+ if (getRouteStopDate() != null) {
+ scheduleRoute(Action.STOP);
+ }
+
+ if (getRouteSuspendDate() != null) {
+ scheduleRoute(Action.SUSPEND);
+ }
+ if (getRouteResumeDate() != null) {
+ scheduleRoute(Action.RESUME);
+ }
+ }
+
+ getScheduler().start();
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (scheduledRouteDetails.getStartJobDetail() != null) {
+ deleteRouteJob(Action.START);
+ }
+ if (scheduledRouteDetails.getStopJobDetail() != null) {
+ deleteRouteJob(Action.STOP);
+ }
+ if (scheduledRouteDetails.getSuspendJobDetail() != null) {
+ deleteRouteJob(Action.SUSPEND);
+ }
+ if (scheduledRouteDetails.getResumeJobDetail() != null) {
+ deleteRouteJob(Action.RESUME);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.routepolicy.quartz.ScheduledRoutePolicy#createTrigger(org.apache.camel.routepolicy.quartz.ScheduledRoutePolicyConstants.Action)
+ */
+ @Override
+ protected Trigger createTrigger(Action action, Route route) throws Exception {
+ SimpleTrigger trigger = null;
+
+ if (action == Action.START) {
+ trigger = new SimpleTrigger(TRIGGER_START + route.getId(), TRIGGER_GROUP + route.getId(),
+ getRouteStartDate(), null, getRouteStartRepeatCount(), getRouteStartRepeatInterval());
+ } else if (action == Action.STOP) {
+ trigger = new SimpleTrigger(TRIGGER_STOP + route.getId(), TRIGGER_GROUP + route.getId(),
+ getRouteStopDate(), null, getRouteStopRepeatCount(), getRouteStopRepeatInterval());
+ } else if (action == Action.SUSPEND) {
+ trigger = new SimpleTrigger(TRIGGER_SUSPEND + route.getId(), TRIGGER_GROUP + route.getId(),
+ getRouteSuspendDate(), null, getRouteSuspendRepeatCount(), getRouteSuspendRepeatInterval());
+ } else if (action == Action.RESUME) {
+ trigger = new SimpleTrigger(TRIGGER_RESUME + route.getId(), TRIGGER_GROUP + route.getId(),
+ getRouteResumeDate(), null, getRouteResumeRepeatCount(), getRouteResumeRepeatInterval());
+ }
+
+ return trigger;
+ }
+
+
+
+ public Date getRouteStartDate() {
+ return routeStartDate;
+ }
+
+ public void setRouteStartDate(Date routeStartDate) {
+ this.routeStartDate = routeStartDate;
+ }
+
+ public Date getRouteStopDate() {
+ return routeStopDate;
+ }
+
+ public void setRouteStopDate(Date routeStopDate) {
+ this.routeStopDate = routeStopDate;
+ }
+
+ public Date getRouteSuspendDate() {
+ return routeSuspendDate;
+ }
+
+ public void setRouteSuspendDate(Date routeSuspendDate) {
+ this.routeSuspendDate = routeSuspendDate;
+ }
+
+ public int getRouteStartRepeatCount() {
+ return routeStartRepeatCount;
+ }
+
+ public void setRouteStartRepeatCount(int routeStartRepeatCount) {
+ this.routeStartRepeatCount = routeStartRepeatCount;
+ }
+
+ public long getRouteStartRepeatInterval() {
+ return routeStartRepeatInterval;
+ }
+
+ public void setRouteStartRepeatInterval(long routeStartRepeatInterval) {
+ this.routeStartRepeatInterval = routeStartRepeatInterval;
+ }
+
+ public int getRouteStopRepeatCount() {
+ return routeStopRepeatCount;
+ }
+
+ public void setRouteStopRepeatCount(int routeStopRepeatCount) {
+ this.routeStopRepeatCount = routeStopRepeatCount;
+ }
+
+ public long getRouteStopRepeatInterval() {
+ return routeStopRepeatInterval;
+ }
+
+ public void setRouteStopRepeatInterval(long routeStopRepeatInterval) {
+ this.routeStopRepeatInterval = routeStopRepeatInterval;
+ }
+
+ public int getRouteSuspendRepeatCount() {
+ return routeSuspendRepeatCount;
+ }
+
+ public void setRouteSuspendRepeatCount(int routeSuspendRepeatCount) {
+ this.routeSuspendRepeatCount = routeSuspendRepeatCount;
+ }
+
+ public long getRouteSuspendRepeatInterval() {
+ return routeSuspendRepeatInterval;
+ }
+
+ public void setRouteSuspendRepeatInterval(long routeSuspendRepeatInterval) {
+ this.routeSuspendRepeatInterval = routeSuspendRepeatInterval;
+ }
+
+ public void setRouteResumeDate(Date routeResumeDate) {
+ this.routeResumeDate = routeResumeDate;
+ }
+
+ public Date getRouteResumeDate() {
+ return routeResumeDate;
+ }
+
+ public void setRouteResumeRepeatCount(int routeResumeRepeatCount) {
+ this.routeResumeRepeatCount = routeResumeRepeatCount;
+ }
+
+ public int getRouteResumeRepeatCount() {
+ return routeResumeRepeatCount;
+ }
+
+ public void setRouteResumeRepeatInterval(long routeResumeRepeatInterval) {
+ this.routeResumeRepeatInterval = routeResumeRepeatInterval;
+ }
+
+ public long getRouteResumeRepeatInterval() {
+ return routeResumeRepeatInterval;
+ }
+
+}
Propchange: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java (added)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+/**
+ * @version $Revision: 882486 $
+ */
+public class CronScheduledRoutePolicyTest extends CamelTestSupport {
+ private static final transient Log LOG = LogFactory.getLog(CronScheduledRoutePolicyTest.class);
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.test.junit4.CamelTestSupport#s;etUp()
+ */
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Test
+ public void testScheduledStartRoutePolicy() throws Exception {
+
+ MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");
+
+ success.expectedMessageCount(1);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy("org/apache/camel/routepolicy/quartz/myquartz.properties");
+ policy.setRouteStartTime("*/3 * * * * ?");
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:success");
+ }
+ });
+ context.start();
+ context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
+
+ Thread.currentThread().sleep(4000);
+ assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+
+ success.assertIsSatisfied();
+ }
+
+ @Test
+ public void testScheduledStopRoutePolicy() throws Exception {
+ boolean consumerStopped = false;
+
+ MockEndpoint unreachable = (MockEndpoint) context.getEndpoint("mock:unreachable");
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy("org/apache/camel/routepolicy/quartz/myquartz.properties");
+ policy.setRouteStopTime("*/3 * * * * ?");
+ policy.setRouteStopGracePeriod(0);
+ policy.setTimeUnit(TimeUnit.MILLISECONDS);
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:unreachable");
+ }
+ });
+ context.start();
+
+ Thread.currentThread().sleep(4000);
+
+ assertTrue(context.getRouteStatus("test") == ServiceStatus.Stopped);
+
+ try {
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+ } catch (CamelExecutionException e) {
+ consumerStopped = true;
+ }
+ assertTrue(consumerStopped);
+ }
+
+ @Test
+ public void testScheduledSuspendRoutePolicy() throws Exception {
+ boolean consumerSuspended = false;
+
+ MockEndpoint unreachable = (MockEndpoint) context.getEndpoint("mock:unreachable");
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy("org/apache/camel/routepolicy/quartz/myquartz.properties");
+ policy.setRouteSuspendTime("*/3 * * * * ?");
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:unreachable");
+ }
+ });
+ context.start();
+
+ Thread.currentThread().sleep(4000);
+ try {
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+ } catch (CamelExecutionException e) {
+ consumerSuspended = true;
+ }
+ assertTrue(consumerSuspended);
+
+ }
+
+ @Test
+ public void testScheduledResumeRoutePolicy() throws Exception {
+
+ MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");
+
+ success.expectedMessageCount(1);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy("org/apache/camel/routepolicy/quartz/myquartz.properties");
+ policy.setRouteResumeTime("*/3 * * * * ?");
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:success");
+ }
+ });
+ context.start();
+ ServiceHelper.suspendService(context.getRoute("test").getConsumer());
+ try {
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+ } catch (CamelExecutionException e) {
+ LOG.debug("Consumer successfully suspended");
+ }
+
+ Thread.currentThread().sleep(5000);
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+
+ success.assertIsSatisfied();
+
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/CronScheduledRoutePolicyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicyTest.java?rev=1001028&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicyTest.java (added)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicyTest.java Fri Sep 24 18:55:30 2010
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.routepolicy.quartz;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+/**
+ * @version $Revision: 882486 $
+ */
+public class SimpleScheduledRoutePolicyTest extends CamelTestSupport {
+ private static final transient Log LOG = LogFactory.getLog(SimpleScheduledRoutePolicyTest.class);
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.test.junit4.CamelTestSupport#s;etUp()
+ */
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Test
+ public void testScheduledStartRoutePolicy() throws Exception {
+ MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");
+
+ success.expectedMessageCount(1);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy("org/apache/camel/routepolicy/quartz/myquartz.properties");
+ long startTime = System.currentTimeMillis() + 3000L;
+ policy.setRouteStartDate(new Date(startTime));
+ policy.setRouteStartRepeatCount(1);
+ policy.setRouteStartRepeatInterval(3000);
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:success");
+ }
+ });
+ context.start();
+ context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
+
+ Thread.currentThread().sleep(4000);
+ assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+
+ success.assertIsSatisfied();
+ }
+
+ @Test
+ public void testScheduledStopRoutePolicy() throws Exception {
+ boolean consumerStopped = false;
+
+ MockEndpoint unreachable = (MockEndpoint) context.getEndpoint("mock:unreachable");
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy();
+ long startTime = System.currentTimeMillis() + 3000;
+ policy.setRouteStopDate(new Date(startTime));
+ policy.setRouteStopRepeatCount(1);
+ policy.setRouteStopRepeatInterval(3000);
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:unreachable");
+ }
+ });
+ context.start();
+
+ Thread.currentThread().sleep(4000);
+
+ assertTrue(context.getRouteStatus("test") == ServiceStatus.Stopped);
+
+ try {
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+ } catch (CamelExecutionException e) {
+ consumerStopped = true;
+ }
+ assertTrue(consumerStopped);
+ }
+
+ @Test
+ public void testScheduledSuspendRoutePolicy() throws Exception {
+ boolean consumerSuspended = false;
+
+ MockEndpoint unreachable = (MockEndpoint) context.getEndpoint("mock:unreachable");
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy();
+ long startTime = System.currentTimeMillis() + 3000L;
+ policy.setRouteSuspendDate(new Date(startTime));
+ policy.setRouteSuspendRepeatCount(1);
+ policy.setRouteSuspendRepeatInterval(3000);
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:unreachable");
+ }
+ });
+ context.start();
+
+ Thread.currentThread().sleep(4000);
+ try {
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+ } catch (CamelExecutionException e) {
+ consumerSuspended = true;
+ }
+ assertTrue(consumerSuspended);
+
+ }
+
+ @Test
+ public void testScheduledResumeRoutePolicy() throws Exception {
+
+ MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");
+
+ success.expectedMessageCount(1);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy();
+ long startTime = System.currentTimeMillis() + 3000L;
+ policy.setRouteResumeDate(new Date(startTime));
+ policy.setRouteResumeRepeatCount(1);
+ policy.setRouteResumeRepeatInterval(3000);
+
+ from("direct:start")
+ .routeId("test")
+ .routePolicy(policy)
+ .to("mock:success");
+ }
+ });
+ context.start();
+ ServiceHelper.suspendService(context.getRoute("test").getConsumer());
+ try {
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+ } catch (CamelExecutionException e) {
+ LOG.debug("Consumer successfully suspended");
+ }
+
+ Thread.currentThread().sleep(4000);
+ template.sendBody("direct:start", "Ready or not, Here, I come");
+
+ success.assertIsSatisfied();
+
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SimpleScheduledRoutePolicyTest.java
------------------------------------------------------------------------------
svn:eol-style = native