You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/28 10:14:09 UTC
svn commit: r917147 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/impl/
main/java/org/apache/camel/management/mbean/
main/java/org/apache/camel/spi/ test/java/org/apache/camel/impl/
test/java/org/apache/camel/...
Author: davsclaus
Date: Sun Feb 28 09:14:09 2010
New Revision: 917147
URL: http://svn.apache.org/viewvc?rev=917147&view=rev
Log:
CAMEL-2497: Added graceful shutdown of individual routes.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java (contents, props changed)
- copied, changed from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java
- copied, changed from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java
- copied, changed from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sun Feb 28 09:14:09 2010
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.model.DataFormatDefinition;
@@ -321,7 +322,8 @@
void startRoute(String routeId) throws Exception;
/**
- * Stops the given route. It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+ * Stops the given route.
+ * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
* unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
*
* @param route the route to stop
@@ -330,7 +332,8 @@
void stopRoute(RouteDefinition route) throws Exception;
/**
- * Stops the given route. It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+ * Stops the given route.
+ * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
* unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
*
* @param routeId the route id
@@ -339,6 +342,28 @@
void stopRoute(String routeId) throws Exception;
/**
+ * Shutdown the given route using {@link org.apache.camel.spi.ShutdownStrategy}.
+ * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+ * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
+ *
+ * @param routeId the route id
+ * @throws Exception is thrown if the route could not be shutdown for whatever reason
+ */
+ void shutdownRoute(String routeId) throws Exception;
+
+ /**
+ * Shutdown the given route using {@link org.apache.camel.spi.ShutdownStrategy} with a specified timeout.
+ * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+ * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
+ *
+ * @param routeId the route id
+ * @param timeout timeout
+ * @param timeUnit the unit to use
+ * @throws Exception is thrown if the route could not be shutdown for whatever reason
+ */
+ void shutdownRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception;
+
+ /**
* Returns the current status of the given route
*
* @param routeId the route id
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sun Feb 28 09:14:09 2010
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
@@ -639,12 +640,35 @@
stopRoute(route.idOrCreate(nodeIdFactory));
}
- /**
- * Stops the route denoted by the given RouteType id
- */
- public synchronized void stopRoute(String key) throws Exception {
- RouteService routeService = routeServices.get(key);
+ public synchronized void stopRoute(String routeId) throws Exception {
+ RouteService routeService = routeServices.get(routeId);
+ if (routeService != null) {
+ routeService.stop();
+ }
+ }
+
+ public synchronized void shutdownRoute(String routeId) throws Exception {
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+ routes.add(order);
+
+ getShutdownStrategy().shutdown(this, routes);
+ // must stop route service as well
+ routeService.stop();
+ }
+ }
+
+ public synchronized void shutdownRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+ RouteService routeService = routeServices.get(routeId);
+ if (routeService != null) {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+ routes.add(order);
+
+ getShutdownStrategy().shutdown(this, routes, timeout, timeUnit);
+ // must stop route service as well
routeService.stop();
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Sun Feb 28 09:14:09 2010
@@ -34,7 +34,7 @@
private static final transient Log LOG = LogFactory.getLog(DefaultInflightRepository.class);
private final AtomicInteger totalCount = new AtomicInteger();
- // us endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints)
+ // use endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints)
private final ConcurrentHashMap<String, AtomicInteger> endpointCount = new ConcurrentHashMap<String, AtomicInteger>();
public void add(Exchange exchange) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Sun Feb 28 09:14:09 2010
@@ -66,13 +66,16 @@
private boolean shutdownNowOnTimeout = true;
public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
+ shutdown(context, routes, getTimeout(), getTimeUnit());
+ }
+ public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
long start = System.currentTimeMillis();
if (timeout > 0) {
- LOG.info("Starting to graceful shutdown routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase() + ")");
+ LOG.info("Starting to graceful shutdown " + routes.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase() + ")");
} else {
- LOG.info("Starting to graceful shutdown routes (no timeout)");
+ LOG.info("Starting to graceful shutdown " + routes.size() + " routes (no timeout)");
}
// use another thread to perform the shutdowns so we can support timeout
@@ -88,7 +91,7 @@
future.cancel(true);
if (shutdownNowOnTimeout) {
- LOG.warn("Timeout occurred. Now forcing all routes to be shutdown now.");
+ LOG.warn("Timeout occurred. Now forcing the routes to be shutdown now.");
// force the routes to shutdown now
shutdownRoutesNow(routes);
} else {
@@ -100,10 +103,10 @@
}
long delta = System.currentTimeMillis() - start;
- // convert to seconds as its easier to read than a big milli seconds number
+ // convert to seconds as its easier to read than a big milli seconds number
long seconds = TimeUnit.SECONDS.convert(delta, TimeUnit.MILLISECONDS);
- LOG.info("Graceful shutdown of routes completed in " + seconds + " seconds");
+ LOG.info("Graceful shutdown of " + routes.size() + " routes completed in " + seconds + " seconds");
}
public void setTimeout(long timeout) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java Sun Feb 28 09:14:09 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.management.mbean;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.ManagementStatisticsLevel;
@@ -127,4 +129,14 @@
public void stop() throws Exception {
context.stopRoute(getRouteId());
}
+
+ @ManagedOperation(description = "Graceful Shutdown Route")
+ public void shutdown() throws Exception {
+ context.shutdownRoute(getRouteId());
+ }
+
+ @ManagedOperation(description = "Graceful Shutdown Route using timeout in seconds")
+ public void shutdown(long timeout) throws Exception {
+ context.shutdownRoute(getRouteId(), timeout, TimeUnit.SECONDS);
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java Sun Feb 28 09:14:09 2010
@@ -44,18 +44,29 @@
* Shutdown the routes
*
* @param context the camel context
- * @param routes the routes, ordered by the order they was started
+ * @param routes the routes, ordered by the order they was started
* @throws Exception is thrown if error shutting down the consumers, however its preferred to avoid this
*/
void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception;
/**
+ * Shutdown the routes using a specified timeout instead of the default timeout values
+ *
+ * @param context the camel context
+ * @param routes the routes, ordered by the order they was started
+ * @param timeout timeout
+ * @param timeUnit the unit to use
+ * @throws Exception is thrown if error shutting down the consumers, however its preferred to avoid this
+ */
+ void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception;
+
+ /**
* Set an timeout to wait for the shutdown to complete.
* <p/>
* Setting a value of 0 or negative will disable timeout and wait until complete
* (potential blocking forever)
*
- * @param timeout timeout in millis
+ * @param timeout timeout
*/
void setTimeout(long timeout);
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java (from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java&r1=917137&r2=917147&rev=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java Sun Feb 28 09:14:09 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -24,14 +26,11 @@
/**
* @version $Revision$
*/
-public class ShutdownGracefuTimeoutTriggerTest extends ContextTestSupport {
+public class ShutdownRouteGracefulTimeoutTriggerTest extends ContextTestSupport {
private static String foo = "";
- public void testShutdownGraceful() throws Exception {
- // timeout after 2 seconds
- context.getShutdownStrategy().setTimeout(2);
-
+ public void testShutdownRouteGraceful() throws Exception {
getMockEndpoint("mock:foo").expectedMessageCount(1);
template.sendBody("seda:foo", "A");
@@ -44,10 +43,14 @@
// now stop the route before its complete
foo = foo + "stop";
- context.stop();
+ // timeout after 2 seconds
+ context.shutdownRoute("seda", 2, TimeUnit.SECONDS);
// should not be able to complete all messages as timeout occurred
assertNotSame("Should not able able to complete all pending messages", "stopABCDE", foo);
+
+ assertEquals("bar should still be running", true, context.getRouteStatus("bar").isStarted());
+ assertEquals("Seda should be stopped", true, context.getRouteStatus("seda").isStopped());
}
@Override
@@ -55,11 +58,13 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("seda:foo").to("mock:foo").delay(1000).process(new Processor() {
+ from("seda:foo").routeId("seda").to("mock:foo").delay(1000).process(new Processor() {
public void process(Exchange exchange) throws Exception {
foo = foo + exchange.getIn().getBody(String.class);
}
});
+
+ from("direct:bar").routeId("bar").to("mock:bar");
}
};
}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java (from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java&r1=917137&r2=917147&rev=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java Sun Feb 28 09:14:09 2010
@@ -24,11 +24,11 @@
/**
* @version $Revision$
*/
-public class ShutdownGracefulWithTimerTest extends ContextTestSupport {
+public class ShutdownRouteGracefulWithTimerTest extends ContextTestSupport {
private static String foo = "";
- public void testShutdownGraceful() throws Exception {
+ public void testShutdownRouteGraceful() throws Exception {
getMockEndpoint("mock:foo").expectedMessageCount(1);
// should be stopped before it fires the first one
getMockEndpoint("mock:timer").expectedMessageCount(0);
@@ -43,10 +43,13 @@
// now stop the route before its complete
foo = foo + "stop";
- context.stop();
+ context.shutdownRoute("seda");
// it should wait as there was 1 inflight exchange and 4 pending messages left
assertEquals("Should graceful shutdown", "stopABCDE", foo);
+
+ assertEquals("Timer should still be running", true, context.getRouteStatus("timer").isStarted());
+ assertEquals("Seda should be stopped", true, context.getRouteStatus("seda").isStopped());
}
@Override
@@ -54,9 +57,9 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("timer:foo?period=500&delay=2000").to("mock:timer");
+ from("timer:foo?period=500&delay=2000").routeId("timer").to("mock:timer");
- from("seda:foo").to("mock:foo").delay(1000).process(new Processor() {
+ from("seda:foo").routeId("seda").to("mock:foo").delay(1000).process(new Processor() {
public void process(Exchange exchange) throws Exception {
foo = foo + exchange.getIn().getBody(String.class);
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java (from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java&r1=917137&r2=917147&rev=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java Sun Feb 28 09:14:09 2010
@@ -29,7 +29,7 @@
/**
* @version $Revision$
*/
-public class ManagedRouteStopAndStartTest extends ContextTestSupport {
+public class ManagedRouteShutdownAndStartTest extends ContextTestSupport {
@Override
protected void setUp() throws Exception {
@@ -37,7 +37,7 @@
super.setUp();
}
- public void testStopAndStartRoute() throws Exception {
+ public void testShutdownAndStartRoute() throws Exception {
MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
ObjectName on = getRouteObjectName(mbeanServer);
@@ -53,7 +53,7 @@
assertEquals("Should be started", ServiceStatus.Started.name(), state);
// stop
- mbeanServer.invoke(on, "stop", null, null);
+ mbeanServer.invoke(on, "shutdown", null, null);
state = (String) mbeanServer.getAttribute(on, "State");
assertEquals("Should be stopped", ServiceStatus.Stopped.name(), state);
@@ -99,4 +99,4 @@
};
}
-}
+}
\ No newline at end of file