You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/29 11:10:24 UTC
svn commit: r980370 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/management/mbean/
camel-core/src/main/java/org/apache/camel/processor/loadbalanc...
Author: davsclaus
Date: Thu Jul 29 09:10:23 2010
New Revision: 980370
URL: http://svn.apache.org/viewvc?rev=980370&view=rev
Log:
CAMEL-3001: Added suspend/resume to CamelContext. (work in progress)
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java (with props)
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextSuspendResumeTest.java (with props)
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextSuspendResumeTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java
camel/trunk/camel-core/src/main/java/org/apache/camel/SuspendableService.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/direct/DirectEndpointRouteInlinedTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java
camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/cxfbean/CxfBeanTest.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Thu Jul 29 09:10:23 2010
@@ -49,10 +49,25 @@ import org.apache.camel.spi.TypeConverte
/**
* Interface used to represent the context used to configure routes and the
* policies to use during message exchanges between endpoints.
+ * <p/>
+ * The context offers the following methods to control the lifecycle:
+ * <ul>
+ * <li>{@link #start()} - to start</li>
+ * <li>{@link #stop()} - to shutdown (will stop all routes/components/endpoints etc and clear internal state/cache)</li>
+ * <li>{@link #suspend()} - to pause routing messages</li>
+ * <li>{@link #resume()} - to resume after a suspend</li>
+ * </ul>
+ * <p/>
+ * <b>Notice:</b> that {@link #stop()} and {@link #suspend()} will graceful stop/suspend routs ensureing any in progress
+ * messages is given time to complete. See more details at {@link org.apache.camel.spi.ShutdownStrategy}.
+ * <p/>
+ * If you are doing a hot restart then its adviced to use the suspend/resume methods which ensures a faster
+ * restart but also allows any internal state to be kept as is.
+ * The stop/start approach will do a <i>cold</i> restart of Camel, where all internal state is reset.
*
* @version $Revision$
*/
-public interface CamelContext extends Service, RuntimeConfiguration {
+public interface CamelContext extends SuspendableService, RuntimeConfiguration {
/**
* Gets the name of the this context.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/StartupListener.java Thu Jul 29 09:10:23 2010
@@ -31,10 +31,10 @@ package org.apache.camel;
public interface StartupListener {
/**
- * Callback invoked when the {@link org.apache.camel.CamelContext} has just been started.
+ * Callback invoked when the {@link CamelContext} has just been started.
*
* @param context the Camel context
- * @param alreadyStarted whether or not the Camel context already has been started. For example the Camel context
+ * @param alreadyStarted whether or not the {@link CamelContext} already has been started. For example the context
* could already have been started, and then a service is added/started later which still
* triggers this callback to be invoked.
* @throws Exception can be thrown in case of errors to fail the startup process and have the application
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/SuspendableService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/SuspendableService.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/SuspendableService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/SuspendableService.java Thu Jul 29 09:10:23 2010
@@ -31,13 +31,17 @@ public interface SuspendableService exte
/**
* Suspends the service.
+ *
+ * @throws Exception is thrown if suspending failed
*/
- void suspend();
+ void suspend() throws Exception;
/**
* Resumes the service.
+ *
+ * @throws Exception is thrown if resuming failed
*/
- void resume();
+ void resume() throws Exception;
/**
* Tests whether the service is suspended or not.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Thu Jul 29 09:10:23 2010
@@ -56,6 +56,7 @@ import org.apache.camel.ServiceStatus;
import org.apache.camel.ShutdownRoute;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.StartupListener;
+import org.apache.camel.SuspendableService;
import org.apache.camel.TypeConverter;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
@@ -102,7 +103,6 @@ import org.apache.camel.util.CamelContex
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.EventHelper;
-import org.apache.camel.util.LRUCache;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ReflectionInjector;
import org.apache.camel.util.ServiceHelper;
@@ -112,19 +112,17 @@ import org.apache.camel.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import static org.apache.camel.util.ServiceHelper.stopServices;
-
/**
* Represents the context used to configure routes and the policies to use.
*
* @version $Revision$
*/
-public class DefaultCamelContext extends ServiceSupport implements CamelContext {
+public class DefaultCamelContext extends ServiceSupport implements CamelContext, SuspendableService {
private static final transient Log LOG = LogFactory.getLog(DefaultCamelContext.class);
private static final String NAME_PREFIX = "camel-";
private static final AtomicInteger CONTEXT_COUNTER = new AtomicInteger(0);
private ClassLoader applicationContextClassLoader;
- private boolean routeDefinitionInitiated;
+ private final AtomicBoolean routeDefinitionInitiated = new AtomicBoolean(false);
private String name;
private final Map<String, Endpoint> endpoints = new EndpointRegistry();
private final AtomicInteger endpointKeyCounter = new AtomicInteger();
@@ -147,6 +145,9 @@ public class DefaultCamelContext extends
private final List<RouteDefinition> routeDefinitions = new ArrayList<RouteDefinition>();
private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
+ private final AtomicBoolean suspending = new AtomicBoolean(false);
+ private final AtomicBoolean suspended = new AtomicBoolean(false);
+ private final AtomicBoolean resuming = new AtomicBoolean(false);
private boolean firstStartDone;
private Boolean autoStartup = Boolean.TRUE;
private Boolean trace = Boolean.FALSE;
@@ -162,6 +163,7 @@ public class DefaultCamelContext extends
private FactoryFinder defaultFactoryFinder;
private final Map<String, FactoryFinder> factories = new HashMap<String, FactoryFinder>();
private final Map<String, RouteService> routeServices = new LinkedHashMap<String, RouteService>();
+ private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<String, RouteService>();
private ClassResolver classResolver = new DefaultClassResolver();
private PackageScanClassResolver packageScanClassResolver;
// we use a capacity of 100 per endpoint, so for the same endpoint we have at most 100 producers in the pool
@@ -980,6 +982,75 @@ public class DefaultCamelContext extends
return TimeUtils.printDuration(delta);
}
+ public boolean isSuspended() {
+ return suspended.get();
+ }
+
+ public void suspend() throws Exception {
+ if (!suspended.get()) {
+ if (suspending.compareAndSet(false, true)) {
+ try {
+ LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is suspending");
+ StopWatch watch = new StopWatch();
+
+ // update list of started routes to be suspended
+ // because we only want to suspend started routes
+ // (so when we resume we only resume the routes which actually was suspended)
+ for (Map.Entry<String, RouteService> entry : getRouteServices().entrySet()) {
+ if (entry.getValue().getStatus().isStarted()) {
+ suspendedRouteServices.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // suspend routes using the shutdown strategy so it can shutdown in correct order
+ // TODO: leverage shutdown strategy to let it run in suspend mode, so it can suspend routes in correct order
+ for (Map.Entry<String, RouteService> entry : suspendedRouteServices.entrySet()) {
+ shutdownRoute(entry.getKey());
+ }
+
+ // TODO: suspended/resumed notification events
+ // TODO: more unit test to ensure suspend/resume with startup ordering is as expected
+
+ watch.stop();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is suspended in " + TimeUtils.printDuration(watch.taken()));
+ }
+ } finally {
+ suspended.set(true);
+ suspending.set(false);
+ resuming.set(false);
+ }
+ }
+ }
+ }
+
+ public void resume() throws Exception {
+ if (suspended.get()) {
+ if (resuming.compareAndSet(false, true)) {
+ try {
+ LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is resuming");
+ StopWatch watch = new StopWatch();
+
+ // start the suspended routes (do not check for route clashes, and indicate )
+ doStartRoutes(suspendedRouteServices, false);
+
+ watch.stop();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Resumed " + suspendedRouteServices.size() + " routes");
+ LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") resumed in " + TimeUtils.printDuration(watch.taken()));
+ }
+
+ // and clear the list as they have been resumed
+ suspendedRouteServices.clear();
+ } finally {
+ suspended.set(false);
+ suspending.set(false);
+ resuming.set(false);
+ }
+ }
+ }
+ }
+
public void start() throws Exception {
boolean doNotStart = !firstStartDone && !isAutoStartup();
firstStartDone = true;
@@ -992,27 +1063,6 @@ public class DefaultCamelContext extends
// super will invoke doStart which will prepare internal services before we continue and start the routes below
super.start();
- LOG.debug("Starting routes...");
-
- // the context is now considered started (i.e. isStarted() == true))
- // starting routes is done after, not during context startup
- safelyStartRouteServices(false, routeServices.values());
-
- for (int i = 0; i < getRoutes().size(); i++) {
- Route route = getRoutes().get(i);
- LOG.info("Route: " + route.getId() + " started and consuming from: " + route.getEndpoint());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Route " + i + ": " + getRoutes().get(i));
- }
- }
-
- // now notify any startup aware listeners as all the routes etc has been started.
- for (StartupListener startup : startupListeners) {
- startup.onCamelContextStarted(this, false);
- }
- // and then get rid of the list as we dont need it anymore
- startupListeners.clear();
-
stopWatch.stop();
if (LOG.isInfoEnabled()) {
LOG.info("Started " + getRoutes().size() + " routes");
@@ -1024,6 +1074,35 @@ public class DefaultCamelContext extends
// Implementation methods
// -----------------------------------------------------------------------
+ /**
+ * Starts the routes
+ *
+ * @param routeServices the routes to start (will only start a route if its not already started)
+ * @param checkClash whether to check for startup ordering clash
+ * @throws Exception is thrown if error starting routes
+ */
+ protected void doStartRoutes(Map<String, RouteService> routeServices, boolean checkClash) throws Exception {
+ // filter out already started routes
+ Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
+ for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
+ if (entry.getValue().getStatus().isStartable()) {
+ filtered.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (!filtered.isEmpty()) {
+ // the context is now considered started (i.e. isStarted() == true))
+ // starting routes is done after, not during context startup
+ safelyStartRouteServices(false, checkClash, filtered.values());
+ }
+
+ // now notify any startup aware listeners as all the routes etc has been started,
+ // allowing the listeners to do custom work after routes has been started
+ for (StartupListener startup : startupListeners) {
+ startup.onCamelContextStarted(this, isStarted());
+ }
+ }
+
protected synchronized void doStart() throws Exception {
startDate = new Date();
stopWatch.restart();
@@ -1118,12 +1197,15 @@ public class DefaultCamelContext extends
startServices(components.values());
- // To avoid initiating the routeDefinitions after stopping the camel context
- if (!routeDefinitionInitiated) {
+ // the route definitions is only started once, even if Camel is stopped
+ if (routeDefinitionInitiated.compareAndSet(false, true)) {
+ // TODO: we should re-create route defs on start, people should use suspend/resume for hot restart
startRouteDefinitions(routeDefinitions);
- routeDefinitionInitiated = true;
}
+ // start routes
+ doStartRoutes(routeServices, true);
+
// starting will continue in the start method
}
@@ -1141,7 +1223,11 @@ public class DefaultCamelContext extends
getRouteStartupOrder().clear();
shutdownServices(routeServices.values());
- // do not clear route services as we can start Camel again and get the route back as before
+ // do not clear route services or startup listeners as we can start Camel again and get the route back as before
+
+ // but clear any suspend routes
+ suspendedRouteServices.clear();
+ suspended.set(false);
// the stop order is important
@@ -1162,7 +1248,6 @@ public class DefaultCamelContext extends
// shutdown services as late as possible
shutdownServices(servicesToClose);
servicesToClose.clear();
- startupListeners.clear();
// must notify that we are stopped before stopping the management strategy
EventHelper.notifyCamelContextStopped(this);
@@ -1178,11 +1263,6 @@ public class DefaultCamelContext extends
// stop the lazy created so they can be re-created on restart
forceStopLazyInitialization();
- // reset values (mark routes as not initialized so they can be started again)
- routeDefinitionInitiated = false;
- firstStartDone = false;
- defaultRouteStartupOrder = 1000;
-
stopWatch.stop();
if (LOG.isInfoEnabled()) {
LOG.info("Uptime: " + getUptime());
@@ -1277,7 +1357,7 @@ public class DefaultCamelContext extends
} else {
routeServices.put(key, routeService);
if (shouldStartRoutes()) {
- safelyStartRouteServices(true, routeService);
+ safelyStartRouteServices(true, true, routeService);
}
}
}
@@ -1289,10 +1369,11 @@ public class DefaultCamelContext extends
* This method <b>must</b> be used to start routes in a safe manner.
*
* @param forceAutoStart whether to force auto starting the routes, despite they may be configured not do do so
+ * @param checkClash whether to check for startup order clash
* @param routeServices the routes
* @throws Exception is thrown if error starting the routes
*/
- protected synchronized void safelyStartRouteServices(boolean forceAutoStart, Collection<RouteService> routeServices) throws Exception {
+ protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, Collection<RouteService> routeServices) throws Exception {
// list of inputs to start when all the routes have been prepared for starting
// we use a tree map so the routes will be ordered according to startup order defined on the route
Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<Integer, DefaultRouteStartupOrder>();
@@ -1301,8 +1382,12 @@ public class DefaultCamelContext extends
for (RouteService routeService : routeServices) {
DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService, forceAutoStart);
// check for clash before we add it as input
- if (order != null && doCheckStartupOrderClash(order, inputs)) {
- inputs.put(order.getStartupOrder(), order);
+ if (order != null) {
+ if (checkClash && doCheckStartupOrderClash(order, inputs)) {
+ inputs.put(order.getStartupOrder(), order);
+ } else {
+ inputs.put(order.getStartupOrder(), order);
+ }
}
}
@@ -1311,14 +1396,13 @@ public class DefaultCamelContext extends
// and now start the routes
// and check for clash with multiple consumers of the same endpoints which is not allowed
- List<Endpoint> routeInputs = new ArrayList<Endpoint>();
- doStartRoutes(inputs, routeInputs);
+ doStartRoutes(inputs);
// inputs no longer needed
inputs.clear();
}
- protected synchronized void safelyStartRouteServices(boolean forceAutoStart, RouteService... routeServices) throws Exception {
- safelyStartRouteServices(forceAutoStart, Arrays.asList(routeServices));
+ protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, RouteService... routeServices) throws Exception {
+ safelyStartRouteServices(forceAutoStart, checkClash, Arrays.asList(routeServices));
}
private DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService, boolean forceAutoStart) throws Exception {
@@ -1350,6 +1434,8 @@ public class DefaultCamelContext extends
}
private boolean doCheckStartupOrderClash(DefaultRouteStartupOrder answer, Map<Integer, DefaultRouteStartupOrder> inputs) throws FailedToStartRouteException {
+ // TODO: There could potential be routeId clash as well, so we should check for that as well
+
// check for clash by startupOrder id
DefaultRouteStartupOrder other = inputs.get(answer.getStartupOrder());
if (other != null && answer != other) {
@@ -1360,7 +1446,9 @@ public class DefaultCamelContext extends
// check in existing already started as well
for (RouteStartupOrder order : routeStartupOrder) {
String otherId = order.getRoute().getId();
- if (answer.getStartupOrder() == order.getStartupOrder()) {
+ if (answer.getRoute().getId().equals(otherId)) {
+ // its the same route id so skip clash check as its the same route (can happen when using suspend/resume)
+ } else if (answer.getStartupOrder() == order.getStartupOrder()) {
throw new FailedToStartRouteException(answer.getRoute().getId(), "startupOrder clash. Route " + otherId + " already has startupOrder "
+ answer.getStartupOrder() + " configured which this route have as well. Please correct startupOrder to be unique among all your routes.");
}
@@ -1386,7 +1474,9 @@ public class DefaultCamelContext extends
}
}
- private void doStartRoutes(Map<Integer, DefaultRouteStartupOrder> inputs, List<Endpoint> routeInputs) throws Exception {
+ private void doStartRoutes(Map<Integer, DefaultRouteStartupOrder> inputs) throws Exception {
+ List<Endpoint> routeInputs = new ArrayList<Endpoint>();
+
for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
Integer order = entry.getKey();
Route route = entry.getValue().getRoute();
@@ -1412,8 +1502,25 @@ public class DefaultCamelContext extends
routeInputs.add(endpoint);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Route: " + route.getId() + " >>> " + route);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Route: " + route.getId() + " started and consuming from: " + endpoint);
+ }
+
// add to the order which they was started, so we know how to stop them in reverse order
- routeStartupOrder.add(entry.getValue());
+ // but only add if we haven't already registered it before (we dont want to double add when restarting)
+ boolean found = false;
+ for (RouteStartupOrder other : routeStartupOrder) {
+ if (other.getRoute().getId() == route.getId()) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ routeStartupOrder.add(entry.getValue());
+ }
}
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Thu Jul 29 09:10:23 2010
@@ -167,9 +167,7 @@ public class RouteService extends Servic
}
protected void doStop() throws Exception {
- // clear inputs
- inputs.clear();
-
+
// if we are stopping CamelContext then we are shutting down
boolean isShutdownCamelContext = camelContext.isStopping();
@@ -208,6 +206,12 @@ public class RouteService extends Servic
camelContext.removeRouteCollection(routes);
}
+ @Override
+ protected void doShutdown() throws Exception {
+ // clear inputs on shutdown
+ inputs.clear();
+ }
+
protected void startChildService(Route route, Service... services) throws Exception {
List<Service> list = new ArrayList<Service>(Arrays.asList(services));
startChildService(route, list);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java Thu Jul 29 09:10:23 2010
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.ServiceStatus;
-import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ManagementStrategy;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
@@ -61,7 +60,7 @@ public class ManagedCamelContext {
@ManagedAttribute(description = "Camel State")
public String getState() {
// must use String type to be sure remote JMX can read the attribute without requiring Camel classes.
- ServiceStatus status = ((ServiceSupport) context).getStatus();
+ ServiceStatus status = (context).getStatus();
// if no status exists then its stopped
if (status == null) {
status = ServiceStatus.Stopped;
@@ -69,6 +68,11 @@ public class ManagedCamelContext {
return status.name();
}
+ @ManagedAttribute(description = "Is Camel suspended")
+ public Boolean getSuspended() {
+ return context.isSuspended();
+ }
+
@ManagedAttribute(description = "Uptime")
public String getUptime() {
return context.getUptime();
@@ -137,6 +141,16 @@ public class ManagedCamelContext {
context.stop();
}
+ @ManagedOperation(description = "Suspend Camel")
+ public void suspend() throws Exception {
+ context.suspend();
+ }
+
+ @ManagedOperation(description = "Resume Camel")
+ public void resume() throws Exception {
+ context.resume();
+ }
+
@ManagedOperation(description = "Send body (in only)")
public void sendBody(String endpointUri, String body) throws Exception {
ProducerTemplate template = context.createProducerTemplate();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java Thu Jul 29 09:10:23 2010
@@ -34,29 +34,37 @@ import org.apache.camel.util.AsyncProces
public abstract class QueueLoadBalancer extends LoadBalancerSupport {
public boolean process(final Exchange exchange, final AsyncCallback callback) {
- boolean sync;
-
List<Processor> list = getProcessors();
- if (list.isEmpty()) {
- throw new IllegalStateException("No processors available to process " + exchange);
- }
- Processor processor = chooseProcessor(list, exchange);
- if (processor == null) {
- throw new IllegalStateException("No processors could be chosen to process " + exchange);
- } else {
- AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
- sync = AsyncProcessorHelper.process(albp, exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // only handle the async case
- if (doneSync) {
- return;
+ if (!list.isEmpty()) {
+ Processor processor = chooseProcessor(list, exchange);
+ if (processor == null) {
+ throw new IllegalStateException("No processors could be chosen to process " + exchange);
+ } else {
+ AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
+ boolean sync = AsyncProcessorHelper.process(albp, exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // only handle the async case
+ if (doneSync) {
+ return;
+ }
+
+ callback.done(false);
}
- callback.done(false);
+ });
+
+ if (!sync) {
+ // will continue routing asynchronously
+ return false;
}
- });
+
+ callback.done(true);
+ return true;
+ }
}
- return sync;
+ // no processors but indicate we are done
+ callback.done(true);
+ return true;
}
public void process(Exchange exchange) throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Thu Jul 29 09:10:23 2010
@@ -194,6 +194,32 @@ public final class ServiceHelper {
}
}
+ public static void resumeServices(Collection<?> services) throws Exception {
+ Exception firstException = null;
+ for (Object value : services) {
+ if (value instanceof Service) {
+ Service service = (Service)value;
+ try {
+ resumeService(service);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Resumed service: " + service);
+ }
+ service.stop();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught exception resuming service: " + service, e);
+ }
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
/**
* Resumes the given service.
* <p/>
@@ -222,20 +248,38 @@ public final class ServiceHelper {
} else {
return false;
}
- } else if (service instanceof ServiceSupport) {
- ServiceSupport ss = (ServiceSupport) service;
- if (ss.getStatus().isStartable()) {
- startService(service);
- return true;
- } else {
- return false;
- }
} else {
startService(service);
return true;
}
}
+ public static void suspendServices(Collection<?> services) throws Exception {
+ Exception firstException = null;
+ for (Object value : services) {
+ if (value instanceof Service) {
+ Service service = (Service)value;
+ try {
+ suspendService(service);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Suspending service: " + service);
+ }
+ service.stop();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught exception suspending service: " + service, e);
+ }
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
/**
* Suspends the given service.
* <p/>
@@ -264,14 +308,6 @@ public final class ServiceHelper {
} else {
return false;
}
- } else if (service instanceof ServiceSupport) {
- ServiceSupport ss = (ServiceSupport) service;
- if (ss.getStatus().isStoppable()) {
- stopServices(service);
- return true;
- } else {
- return false;
- }
} else {
stopService(service);
return true;
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/direct/DirectEndpointRouteInlinedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/direct/DirectEndpointRouteInlinedTest.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/direct/DirectEndpointRouteInlinedTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/direct/DirectEndpointRouteInlinedTest.java Thu Jul 29 09:10:23 2010
@@ -33,6 +33,9 @@ public class DirectEndpointRouteInlinedT
});
context.start();
+ // invoke start a 2nd time wont break stuff
+ context.start();
+
getMockEndpoint("mock:result").expectedMessageCount(1);
template.sendBody("direct:start", "Hello World");
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java?rev=980370&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java Thu Jul 29 09:10:23 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultCamelContextSuspendResumeRouteStartupOrderTest extends ContextTestSupport {
+
+ public void testSuspendResume() throws Exception {
+ assertFalse(context.isSuspended());
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A");
+
+ template.sendBody("seda:foo", "A");
+
+ assertMockEndpointsSatisfied();
+
+ log.info("Suspending");
+
+ // now suspend and dont expect a message to be routed
+ resetMocks();
+ mock.expectedMessageCount(0);
+ context.suspend();
+ template.sendBody("seda:foo", "B");
+ mock.assertIsSatisfied(1000);
+
+ assertTrue(context.isSuspended());
+
+ log.info("Resuming");
+
+ // now resume and expect the previous message to be routed
+ resetMocks();
+ mock.expectedBodiesReceived("B");
+ context.resume();
+ assertMockEndpointsSatisfied();
+
+ assertFalse(context.isSuspended());
+
+ context.stop();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo").routeId("C").startupOrder(3).to("log:foo").to("direct:bar");
+
+ from("direct:baz").routeId("A").startupOrder(1).to("log:baz").to("mock:result");
+
+ from("direct:bar").routeId("B").startupOrder(2).to("log:bar").to("direct:baz");
+ }
+ };
+ }
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java?rev=980370&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java Thu Jul 29 09:10:23 2010
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultCamelContextSuspendResumeRouteTest extends ContextTestSupport {
+
+ public void testSuspendResume() throws Exception {
+ assertFalse(context.isSuspended());
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A");
+
+ template.sendBody("seda:foo", "A");
+
+ assertMockEndpointsSatisfied();
+
+ log.info("Suspending");
+
+ // now suspend and dont expect a message to be routed
+ resetMocks();
+ mock.expectedMessageCount(0);
+ context.suspend();
+ template.sendBody("seda:foo", "B");
+ mock.assertIsSatisfied(1000);
+
+ assertTrue(context.isSuspended());
+
+ log.info("Resuming");
+
+ // now resume and expect the previous message to be routed
+ resetMocks();
+ mock.expectedBodiesReceived("B");
+ context.resume();
+ assertMockEndpointsSatisfied();
+
+ assertFalse(context.isSuspended());
+
+ context.stop();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo").to("log:foo").to("mock:result");
+ }
+ };
+ }
+}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java Thu Jul 29 09:10:23 2010
@@ -232,4 +232,73 @@ public class DefaultCamelContextTest ext
ctx.stop();
}
+ public void testSuspend() throws Exception {
+ DefaultCamelContext ctx = new DefaultCamelContext();
+
+ assertEquals(false, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.start();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.suspend();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(true, ctx.isSuspended());
+
+ ctx.suspend();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(true, ctx.isSuspended());
+
+ ctx.stop();
+ assertEquals(false, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+ }
+
+ public void testResume() throws Exception {
+ DefaultCamelContext ctx = new DefaultCamelContext();
+
+ assertEquals(false, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.start();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.resume();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.resume();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.stop();
+ assertEquals(false, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+ }
+
+ public void testSuspendResume() throws Exception {
+ DefaultCamelContext ctx = new DefaultCamelContext();
+
+ assertEquals(false, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.start();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.suspend();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(true, ctx.isSuspended());
+
+ ctx.resume();
+ assertEquals(true, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+
+ ctx.stop();
+ assertEquals(false, ctx.isStarted());
+ assertEquals(false, ctx.isSuspended());
+ }
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java Thu Jul 29 09:10:23 2010
@@ -55,6 +55,9 @@ public class ManagedCamelContextTest ext
String uptime = (String) mbeanServer.getAttribute(on, "Uptime");
assertNotNull(uptime);
+ Boolean suspended = (Boolean) mbeanServer.getAttribute(on, "Suspended");
+ assertEquals(false, suspended.booleanValue());
+
// invoke operations
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java Thu Jul 29 09:10:23 2010
@@ -51,9 +51,9 @@ public class RouteStartupOrderSimpleTest
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("seda:foo").startupOrder(2).to("mock:result");
+ from("seda:foo").startupOrder(2).routeId("b").to("mock:result");
- from("direct:start").startupOrder(1).to("seda:foo");
+ from("direct:start").startupOrder(1).routeId("a").to("seda:foo");
}
};
}
Modified: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/cxfbean/CxfBeanTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/cxfbean/CxfBeanTest.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/cxfbean/CxfBeanTest.java (original)
+++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/cxfbean/CxfBeanTest.java Thu Jul 29 09:10:23 2010
@@ -33,6 +33,7 @@ import org.apache.http.client.methods.Ht
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
+import org.junit.Ignore;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -69,6 +70,7 @@ public class CxfBeanTest extends Abstrac
}
@Test
+ @Ignore("There is a bug in CxfRsComponent when restarting using stop/start")
public void testGetConsumerAfterReStartCamelContext() throws Exception {
URL url = new URL("http://localhost:9000/customerservice/customers/123");
@@ -88,6 +90,25 @@ public class CxfBeanTest extends Abstrac
}
@Test
+ public void testGetConsumerAfterResumingCamelContext() throws Exception {
+ URL url = new URL("http://localhost:9000/customerservice/customers/123");
+
+ InputStream in = url.openStream();
+ assertEquals("{\"Customer\":{\"id\":123,\"name\":\"John\"}}", CxfUtils.getStringFromInputStream(in));
+ in.close();
+
+ camelContext.suspend();
+ camelContext.resume();
+
+ url = new URL("http://localhost:9000/customerservice/orders/223/products/323");
+ in = url.openStream();
+
+ assertEquals("{\"Product\":{\"description\":\"product 323\",\"id\":323}}",
+ CxfUtils.getStringFromInputStream(in));
+ in.close();
+ }
+
+ @Test
public void testPutConsumer() throws Exception {
HttpPut put = new HttpPut("http://localhost:9000/customerservice/customers");
StringEntity entity = new StringEntity(PUT_REQUEST, "ISO-8859-1");
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=980370&r1=980369&r2=980370&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java Thu Jul 29 09:10:23 2010
@@ -23,6 +23,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.loadbalancer.LoadBalancer;
import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
import org.apache.camel.util.ExchangeHelper;
@@ -47,8 +48,8 @@ public class QuartzEndpoint extends Defa
private LoadBalancer loadBalancer;
private Trigger trigger;
private JobDetail jobDetail;
- private boolean started;
- private boolean stateful;
+ private volatile boolean started;
+ private volatile boolean stateful;
public QuartzEndpoint(final String endpointUri, final QuartzComponent component) {
super(endpointUri, component);
@@ -87,12 +88,24 @@ public class QuartzEndpoint extends Defa
* @param jobExecutionContext the Quartz Job context
*/
public void onJobExecute(final JobExecutionContext jobExecutionContext) throws JobExecutionException {
+ boolean run = true;
+ LoadBalancer balancer = getLoadBalancer();
+ if (balancer instanceof ServiceSupport) {
+ run = ((ServiceSupport) balancer).isRunAllowed();
+ }
+
+ if (!run) {
+ // quartz scheduler could potential trigger during a route has been shutdown
+ LOG.warn("Cannot execute Quartz Job with context: " + jobExecutionContext + " because processor is not started: " + balancer);
+ return;
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Firing Quartz Job with context: " + jobExecutionContext);
}
Exchange exchange = createExchange(jobExecutionContext);
try {
- getLoadBalancer().process(exchange);
+ balancer.process(exchange);
if (exchange.getException() != null) {
// propagate the exception back to Quartz
@@ -194,11 +207,12 @@ public class QuartzEndpoint extends Defa
public synchronized void consumerStopped(final QuartzConsumer consumer) throws SchedulerException {
ObjectHelper.notNull(trigger, "trigger");
- getLoadBalancer().removeProcessor(consumer.getProcessor());
- if (getLoadBalancer().getProcessors().isEmpty() && started) {
+ if (started) {
removeTrigger(getTrigger(), getJobDetail());
started = false;
}
+
+ getLoadBalancer().removeProcessor(consumer.getProcessor());
}
protected LoadBalancer createLoadBalancer() {
Added: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java?rev=980370&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java (added)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java Thu Jul 29 09:10:23 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class QuartzOneCamelContextRestartTest {
+
+ private DefaultCamelContext camel1;
+
+ @Before
+ public void setUp() throws Exception {
+ camel1 = new DefaultCamelContext();
+ camel1.setName("camel-1");
+ camel1.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("mock:one");
+ }
+ });
+ camel1.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ camel1.stop();
+ }
+
+ @Test
+ public void testOneCamelContextSuspendResume() throws Exception {
+ MockEndpoint mock1 = camel1.getEndpoint("mock:one", MockEndpoint.class);
+ mock1.expectedMinimumMessageCount(2);
+ mock1.assertIsSatisfied();
+
+ camel1.stop();
+
+ // should resume triggers when we start camel 1 again
+ mock1.reset();
+ mock1.expectedMinimumMessageCount(2);
+ camel1.start();
+
+ mock1.assertIsSatisfied();
+ }
+
+
+}
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextSuspendResumeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextSuspendResumeTest.java?rev=980370&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextSuspendResumeTest.java (added)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextSuspendResumeTest.java Thu Jul 29 09:10:23 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class QuartzOneCamelContextSuspendResumeTest {
+
+ private DefaultCamelContext camel1;
+
+ @Before
+ public void setUp() throws Exception {
+ camel1 = new DefaultCamelContext();
+ camel1.setName("camel-1");
+ camel1.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("mock:one");
+ }
+ });
+ camel1.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ camel1.stop();
+ }
+
+ @Test
+ public void testOneCamelContextSuspendResume() throws Exception {
+ MockEndpoint mock1 = camel1.getEndpoint("mock:one", MockEndpoint.class);
+ mock1.expectedMinimumMessageCount(2);
+ mock1.assertIsSatisfied();
+
+ camel1.suspend();
+
+ // should resume triggers when we start camel 1 again
+ mock1.reset();
+ mock1.expectedMinimumMessageCount(2);
+ camel1.resume();
+
+ mock1.assertIsSatisfied();
+ }
+
+
+}
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextSuspendResumeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextSuspendResumeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextSuspendResumeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextSuspendResumeTest.java?rev=980370&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextSuspendResumeTest.java (added)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextSuspendResumeTest.java Thu Jul 29 09:10:23 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class QuartzTwoCamelContextSuspendResumeTest {
+
+ private DefaultCamelContext camel1;
+ private DefaultCamelContext camel2;
+
+ @Before
+ public void setUp() throws Exception {
+ camel1 = new DefaultCamelContext();
+ camel1.setName("camel-1");
+ camel1.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("mock:one");
+ }
+ });
+ camel1.start();
+
+ camel2 = new DefaultCamelContext();
+ camel2.setName("camel-2");
+ camel2.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("quartz://myOtherGroup/myOtherTimerName?cron=0/1+*+*+*+*+?").to("mock:two");
+ }
+ });
+ camel2.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ camel1.stop();
+ camel2.stop();
+ }
+
+ @Test
+ public void testTwoCamelContextRestart() throws Exception {
+ MockEndpoint mock1 = camel1.getEndpoint("mock:one", MockEndpoint.class);
+ mock1.expectedMinimumMessageCount(2);
+
+ MockEndpoint mock2 = camel2.getEndpoint("mock:two", MockEndpoint.class);
+ mock2.expectedMinimumMessageCount(6);
+ mock1.assertIsSatisfied();
+
+ camel1.suspend();
+
+ mock2.assertIsSatisfied();
+
+ // should resume triggers when we start camel 1 again
+ mock1.reset();
+ mock1.expectedMinimumMessageCount(2);
+ camel1.resume();
+
+ mock1.assertIsSatisfied();
+ }
+
+
+}
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextSuspendResumeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextSuspendResumeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date