You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/30 17:10:03 UTC
svn commit: r980821 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/
test/java/org/apache/camel/impl/ test/java/org/apache/camel/management/
test/java/org/apache/camel/processor/
Author: davsclaus
Date: Fri Jul 30 15:10:03 2010
New Revision: 980821
URL: http://svn.apache.org/viewvc?rev=980821&view=rev
Log:
CAMEL-3008: CamelContext autoStartup=false should still startup, but not start any routes. Then you can use JMX to start the routes.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextAutoStartupTest.java
- copied, changed from r980688, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextAutoStartupTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=980821&r1=980820&r2=980821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Jul 30 15:10:03 2010
@@ -147,7 +147,9 @@ public class DefaultCamelContext extends
private final AtomicBoolean suspending = new AtomicBoolean(false);
private final AtomicBoolean suspended = new AtomicBoolean(false);
private final AtomicBoolean resuming = new AtomicBoolean(false);
- private boolean firstStartDone;
+ // special flags to control the first startup which can are special
+ private volatile boolean firstStartDone;
+ private volatile boolean doNotStartRoutesOnFirstStart;
private Boolean autoStartup = Boolean.TRUE;
private Boolean trace = Boolean.FALSE;
private Boolean streamCache = Boolean.FALSE;
@@ -608,7 +610,7 @@ public class DefaultCamelContext extends
public synchronized void stopRoute(String routeId) throws Exception {
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
- routeService.stop();
+ stopRouteService(routeService);
}
}
@@ -622,7 +624,7 @@ public class DefaultCamelContext extends
getShutdownStrategy().shutdown(this, routes);
// must stop route service as well
- routeService.stop();
+ stopRouteService(routeService);
}
}
@@ -636,7 +638,7 @@ public class DefaultCamelContext extends
getShutdownStrategy().shutdown(this, routes, timeout, timeUnit);
// must stop route service as well
- routeService.stop();
+ stopRouteService(routeService);
}
}
@@ -1044,8 +1046,8 @@ public class DefaultCamelContext extends
LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is resuming");
StopWatch watch = new StopWatch();
- // start the suspended routes (do not check for route clashes, and indicate )
- doStartRoutes(suspendedRouteServices, false);
+ // start the suspended routes (do not check for route clashes, and indicate)
+ doStartRoutes(suspendedRouteServices, false, true);
watch.stop();
if (LOG.isInfoEnabled()) {
@@ -1070,20 +1072,26 @@ public class DefaultCamelContext extends
}
public void start() throws Exception {
- boolean doNotStart = !firstStartDone && !isAutoStartup();
- firstStartDone = true;
+ startDate = new Date();
+ stopWatch.restart();
+ LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is starting");
- if (doNotStart) {
- LOG.info("Cannot start Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") as it has been configured to not auto start");
- return;
- }
+ doNotStartRoutesOnFirstStart = !firstStartDone && !isAutoStartup();
+ firstStartDone = true;
- // super will invoke doStart which will prepare internal services before we continue and start the routes below
+ // super will invoke doStart which will prepare internal services and start routes etc.
super.start();
stopWatch.stop();
if (LOG.isInfoEnabled()) {
- LOG.info("Started " + getRoutes().size() + " routes");
+ // count how many routes are actually started
+ int started = 0;
+ for (Route route : getRoutes()) {
+ if (getRouteStatus(route.getId()).isStarted()) {
+ started++;
+ }
+ }
+ LOG.info("Total " + getRoutes().size() + " routes, of which " + started + " is started.");
LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") started in " + TimeUtils.printDuration(stopWatch.taken()));
}
EventHelper.notifyCamelContextStarted(this);
@@ -1097,9 +1105,10 @@ public class DefaultCamelContext extends
*
* @param routeServices the routes to start (will only start a route if its not already started)
* @param checkClash whether to check for startup ordering clash
+ * @param startConsumer whether the route consumer should be started. Can be used to warmup the route without starting the consumer
* @throws Exception is thrown if error starting routes
*/
- protected void doStartRoutes(Map<String, RouteService> routeServices, boolean checkClash) throws Exception {
+ protected void doStartRoutes(Map<String, RouteService> routeServices, boolean checkClash, boolean startConsumer) throws Exception {
// filter out already started routes
Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
@@ -1125,7 +1134,7 @@ public class DefaultCamelContext extends
if (!filtered.isEmpty()) {
// the context is now considered started (i.e. isStarted() == true))
// starting routes is done after, not during context startup
- safelyStartRouteServices(false, checkClash, filtered.values());
+ safelyStartRouteServices(false, checkClash, startConsumer, filtered.values());
}
// now notify any startup aware listeners as all the routes etc has been started,
@@ -1136,10 +1145,6 @@ public class DefaultCamelContext extends
}
protected synchronized void doStart() throws Exception {
- startDate = new Date();
- stopWatch.restart();
- LOG.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is starting");
-
try {
doStartCamel();
} catch (Exception e) {
@@ -1233,7 +1238,12 @@ public class DefaultCamelContext extends
startRouteDefinitions(routeDefinitions);
// start routes
- doStartRoutes(routeServices, true);
+ if (doNotStartRoutesOnFirstStart) {
+ LOG.info("Cannot start routes as CamelContext has been configured with autoStartup=false");
+ }
+
+ // invoke this logic to warmup the routes and if possible also start the routes
+ doStartRoutes(routeServices, true, !doNotStartRoutesOnFirstStart);
// starting will continue in the start method
}
@@ -1386,8 +1396,26 @@ public class DefaultCamelContext extends
} else {
routeServices.put(key, routeService);
if (shouldStartRoutes()) {
- safelyStartRouteServices(true, true, routeService);
+ safelyStartRouteServices(true, true, true, routeService);
+ }
+ }
+ }
+
+ protected synchronized void stopRouteService(RouteService routeService) throws Exception {
+ String key = routeService.getId();
+ ServiceStatus status = getRouteStatus(key);
+
+ if (status != null && status.isStopped()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Route " + key + " is already stopped");
+ }
+ } else {
+ for (Route route : routeService.getRoutes()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Route: " + route.getId() + " stopped, was consuming from: " + route.getConsumer().getEndpoint());
+ }
}
+ routeService.stop();
}
}
@@ -1398,11 +1426,13 @@ public class DefaultCamelContext extends
* This method <b>must</b> be used to start routes in a safe manner.
*
* @param forceAutoStart whether to force auto starting the routes, despite they may be configured not do do so
- * @param checkClash whether to check for startup order clash
+ * @param checkClash whether to check for startup order clash
+ * @param startConsumer whether the route consumer should be started. Can be used to warmup the route without starting the consumer
* @param routeServices the routes
* @throws Exception is thrown if error starting the routes
*/
- protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, Collection<RouteService> routeServices) throws Exception {
+ protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, boolean startConsumer,
+ Collection<RouteService> routeServices) throws Exception {
// list of inputs to start when all the routes have been prepared for starting
// we use a tree map so the routes will be ordered according to startup order defined on the route
Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<Integer, DefaultRouteStartupOrder>();
@@ -1421,17 +1451,24 @@ public class DefaultCamelContext extends
}
// warm up routes before we start them
- doWarmUpRoutes(inputs);
+ doWarmUpRoutes(inputs, startConsumer);
+
+ if (startConsumer) {
+ // and now start the routes
+ // and check for clash with multiple consumers of the same endpoints which is not allowed
+ doStartRouteConsumers(inputs);
+ }
- // and now start the routes
- // and check for clash with multiple consumers of the same endpoints which is not allowed
- doStartRoutes(inputs);
// inputs no longer needed
inputs.clear();
}
- protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, RouteService... routeServices) throws Exception {
- safelyStartRouteServices(forceAutoStart, checkClash, Arrays.asList(routeServices));
+ /**
+ * @see #safelyStartRouteServices(boolean, boolean, boolean, java.util.Collection)
+ */
+ protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, boolean startConsumer,
+ RouteService... routeServices) throws Exception {
+ safelyStartRouteServices(forceAutoStart, checkClash, startConsumer, Arrays.asList(routeServices));
}
private DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService, boolean forceAutoStart) throws Exception {
@@ -1485,7 +1522,7 @@ public class DefaultCamelContext extends
return true;
}
- private void doWarmUpRoutes(Map<Integer, DefaultRouteStartupOrder> inputs) throws Exception {
+ private void doWarmUpRoutes(Map<Integer, DefaultRouteStartupOrder> inputs, boolean autoStartup) throws Exception {
// now prepare the routes by starting its services before we start the input
for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
// defer starting inputs till later as we want to prepare the routes by starting
@@ -1494,23 +1531,23 @@ public class DefaultCamelContext extends
// what this does is to ensure Camel is more robust on starting routes as all routes
// will then be prepared in time before we start inputs which will consume messages to be routed
RouteService routeService = entry.getValue().getRouteService();
- routeService.startInputs(false);
- try {
- routeService.start();
- } finally {
- routeService.startInputs(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Warming up route id: " + routeService.getId() + " having autoStartup=" + autoStartup);
}
+ routeService.warmUp();
}
}
- private void doStartRoutes(Map<Integer, DefaultRouteStartupOrder> inputs) throws Exception {
+ private void doStartRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs) throws Exception {
List<Endpoint> routeInputs = new ArrayList<Endpoint>();
for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
Integer order = entry.getKey();
Route route = entry.getValue().getRoute();
+ // start the service
RouteService routeService = entry.getValue().getRouteService();
+
for (Consumer consumer : routeService.getInputs().values()) {
Endpoint endpoint = consumer.getEndpoint();
@@ -1551,6 +1588,9 @@ public class DefaultCamelContext extends
routeStartupOrder.add(entry.getValue());
}
}
+
+ // and start the route service (no need to start children as they are alredy warmed up)
+ routeService.start(false);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=980821&r1=980820&r2=980821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Fri Jul 30 15:10:03 2010
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
@@ -52,8 +53,8 @@ public class RouteService extends Servic
private final List<Route> routes;
private final String id;
private boolean removingRoutes;
- private boolean startInputs = true;
private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>();
+ private final AtomicBoolean warmUpDone = new AtomicBoolean(false);
public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) {
this.camelContext = camelContext;
@@ -84,17 +85,6 @@ public class RouteService extends Servic
}
/**
- * Sets whether inputs (consumers) should be started when starting the routes
- * <p/>
- * By default inputs are started.
- *
- * @param flag flag to either start inputs or not
- */
- public void startInputs(boolean flag) {
- this.startInputs = flag;
- }
-
- /**
* Gets the inputs to the routes.
*
* @return list of {@link Consumer} as inputs for the routes
@@ -111,7 +101,7 @@ public class RouteService extends Servic
this.removingRoutes = removingRoutes;
}
- protected void doStart() throws Exception {
+ public void warmUp() throws Exception {
camelContext.addRouteCollection(routes);
for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
@@ -148,22 +138,24 @@ public class RouteService extends Servic
}
}
startChildService(route, childServices);
+ }
+
+ warmUpDone.set(true);
+ }
+ protected void doStart() throws Exception {
+ // ensure we are warmed up before starting the route
+ if (warmUpDone.compareAndSet(false, true)) {
+ warmUp();
+ }
+
+ for (Route route : routes) {
// start the route itself
ServiceHelper.startService(route);
// fire event
EventHelper.notifyRouteStarted(camelContext, route);
}
-
- if (startInputs) {
- // start the input consumers
- for (Map.Entry<Route, Consumer> entry : inputs.entrySet()) {
- Route route = entry.getKey();
- Consumer consumer = entry.getValue();
- startChildService(route, consumer);
- }
- }
}
protected void doStop() throws Exception {
@@ -204,17 +196,14 @@ public class RouteService extends Servic
}
camelContext.removeRouteCollection(routes);
+ warmUpDone.set(false);
}
@Override
protected void doShutdown() throws Exception {
// clear inputs on shutdown
inputs.clear();
- }
-
- protected void startChildService(Route route, Service... services) throws Exception {
- List<Service> list = new ArrayList<Service>(Arrays.asList(services));
- startChildService(route, list);
+ warmUpDone.set(false);
}
protected void startChildService(Route route, List<Service> services) throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=980821&r1=980820&r2=980821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Fri Jul 30 15:10:03 2010
@@ -44,12 +44,16 @@ public abstract class ServiceSupport imp
private String version;
public void start() throws Exception {
+ start(true);
+ }
+
+ public void start(boolean startChildren) throws Exception {
if (!started.get()) {
if (starting.compareAndSet(false, true)) {
boolean childrenStarted = false;
Exception ex = null;
try {
- if (childServices != null) {
+ if (childServices != null && startChildren) {
ServiceHelper.startServices(childServices);
}
childrenStarted = true;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=980821&r1=980820&r2=980821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Fri Jul 30 15:10:03 2010
@@ -230,11 +230,10 @@ public class DefaultManagementAgent exte
}
public void unregister(ObjectName name) throws JMException {
- server.unregisterMBean(name);
- mbeansRegistered.remove(name);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unregistered MBean with objectname: " + name);
+ if (server.isRegistered(name)) {
+ server.unregisterMBean(name);
}
+ mbeansRegistered.remove(name);
}
public boolean isRegistered(ObjectName name) {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextAutoStartupTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextAutoStartupTest.java?rev=980821&r1=980820&r2=980821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextAutoStartupTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextAutoStartupTest.java Fri Jul 30 15:10:03 2010
@@ -33,19 +33,19 @@ public class DefaultCamelContextAutoStar
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("mock:result");
+ from("direct:start").routeId("foo").to("mock:result");
}
});
camel.start();
- assertEquals(false, camel.isStarted());
- assertEquals(0, camel.getRoutes().size());
-
- // now start it again as auto startup prevented it from starting first time
- camel.start();
-
assertEquals(true, camel.isStarted());
assertEquals(1, camel.getRoutes().size());
+ assertEquals(true, camel.getRouteStatus("foo").isStopped());
+
+ // now start the routes
+ camel.startRoute("foo");
+
+ assertEquals(true, camel.getRouteStatus("foo").isStarted());
// and now its started we can test that it works by sending in a message to the route
MockEndpoint mock = camel.getEndpoint("mock:result", MockEndpoint.class);
@@ -66,13 +66,14 @@ public class DefaultCamelContextAutoStar
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("mock:result");
+ from("direct:start").routeId("foo").to("mock:result");
}
});
camel.start();
assertEquals(true, camel.isStarted());
assertEquals(1, camel.getRoutes().size());
+ assertEquals(true, camel.getRouteStatus("foo").isStarted());
MockEndpoint mock = camel.getEndpoint("mock:result", MockEndpoint.class);
mock.expectedMessageCount(1);
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextAutoStartupTest.java (from r980688, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextAutoStartupTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextAutoStartupTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java&r1=980688&r2=980821&rev=980821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextAutoStartupTest.java Fri Jul 30 15:10:03 2010
@@ -22,13 +22,12 @@ import javax.management.ObjectName;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
/**
* @version $Revision$
*/
-public class ManagedCamelContextTest extends ContextTestSupport {
+public class ManagedCamelContextAutoStartupTest extends ContextTestSupport {
@Override
protected boolean useJmx() {
@@ -47,24 +46,21 @@ public class ManagedCamelContextTest ext
MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=context,name=\"camel-1\"");
+ ObjectName onRoute = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\"");
assertTrue("Should be registered", mbeanServer.isRegistered(on));
String name = (String) mbeanServer.getAttribute(on, "CamelId");
assertEquals("camel-1", name);
- String uptime = (String) mbeanServer.getAttribute(on, "Uptime");
- assertNotNull(uptime);
+ assertTrue("Should be registered", mbeanServer.isRegistered(onRoute));
+ String state = (String) mbeanServer.getAttribute(onRoute, "State");
+ assertEquals("Stopped", state);
- Boolean suspended = (Boolean) mbeanServer.getAttribute(on, "Suspended");
- assertEquals(false, suspended.booleanValue());
+ // start the route
+ mbeanServer.invoke(onRoute, "start", null, null);
- // invoke operations
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
-
- mbeanServer.invoke(on, "sendBody", new Object[]{"direct:start", "Hello World"}, new String[]{"java.lang.String", "java.lang.String"});
-
- assertMockEndpointsSatisfied();
+ state = (String) mbeanServer.getAttribute(onRoute, "State");
+ assertEquals("Started", state);
Object reply = mbeanServer.invoke(on, "requestBody", new Object[]{"direct:foo", "Hello World"}, new String[]{"java.lang.String", "java.lang.String"});
assertEquals("Bye World", reply);
@@ -78,9 +74,9 @@ public class ManagedCamelContextTest ext
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("mock:result");
+ context.setAutoStartup(false);
- from("direct:foo").transform(constant("Bye World"));
+ from("direct:foo").routeId("foo").transform(constant("Bye World"));
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java?rev=980821&r1=980820&r2=980821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteServicesStartupOrderTest.java Fri Jul 30 15:10:03 2010
@@ -59,7 +59,6 @@ public class RouteServicesStartupOrderTe
// assert route service was started in order as well
assertEquals("22114433", startOrder);
-
}
@Override