You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/28 10:14:09 UTC

svn commit: r917147 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/mbean/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/impl/ test/java/org/apache/camel/...

Author: davsclaus
Date: Sun Feb 28 09:14:09 2010
New Revision: 917147

URL: http://svn.apache.org/viewvc?rev=917147&view=rev
Log:
CAMEL-2497: Added graceful shutdown of individual routes.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java   (contents, props changed)
      - copied, changed from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java
      - copied, changed from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java
      - copied, changed from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sun Feb 28 09:14:09 2010
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.model.DataFormatDefinition;
@@ -321,7 +322,8 @@
     void startRoute(String routeId) throws Exception;
 
     /**
-     * Stops the given route. It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+     * Stops the given route.
+     * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
      * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
      *
      * @param route the route to stop
@@ -330,7 +332,8 @@
     void stopRoute(RouteDefinition route) throws Exception;
 
     /**
-     * Stops the given route. It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+     * Stops the given route.
+     * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
      * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
      *
      * @param routeId the route id
@@ -339,6 +342,28 @@
     void stopRoute(String routeId) throws Exception;
 
     /**
+     * Shutdown the given route using {@link org.apache.camel.spi.ShutdownStrategy}.
+     * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+     * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
+     *
+     * @param routeId the route id
+     * @throws Exception is thrown if the route could not be shutdown for whatever reason
+     */
+    void shutdownRoute(String routeId) throws Exception;
+
+    /**
+     * Shutdown the given route using {@link org.apache.camel.spi.ShutdownStrategy} with a specified timeout.
+     * It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+     * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
+     *
+     * @param routeId the route id
+     * @param timeout   timeout
+     * @param timeUnit  the unit to use
+     * @throws Exception is thrown if the route could not be shutdown for whatever reason
+     */
+    void shutdownRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception;
+
+    /**
      * Returns the current status of the given route
      *
      * @param routeId the route id

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sun Feb 28 09:14:09 2010
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.naming.Context;
@@ -639,12 +640,35 @@
         stopRoute(route.idOrCreate(nodeIdFactory));
     }
 
-    /**
-     * Stops the route denoted by the given RouteType id
-     */
-    public synchronized void stopRoute(String key) throws Exception {
-        RouteService routeService = routeServices.get(key);
+    public synchronized void stopRoute(String routeId) throws Exception {
+        RouteService routeService = routeServices.get(routeId);
+        if (routeService != null) {
+            routeService.stop();
+        }
+    }
+
+    public synchronized void shutdownRoute(String routeId) throws Exception {
+        RouteService routeService = routeServices.get(routeId);
         if (routeService != null) {
+            List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+            RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+            routes.add(order);
+
+            getShutdownStrategy().shutdown(this, routes);
+            // must stop route service as well
+            routeService.stop();
+        }
+    }
+
+    public synchronized void shutdownRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+        RouteService routeService = routeServices.get(routeId);
+        if (routeService != null) {
+            List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+            RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+            routes.add(order);
+
+            getShutdownStrategy().shutdown(this, routes, timeout, timeUnit);
+            // must stop route service as well
             routeService.stop();
         }
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Sun Feb 28 09:14:09 2010
@@ -34,7 +34,7 @@
 
     private static final transient Log LOG = LogFactory.getLog(DefaultInflightRepository.class);
     private final AtomicInteger totalCount = new AtomicInteger();
-    // us endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints)
+    // use endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints)
     private final ConcurrentHashMap<String, AtomicInteger> endpointCount = new ConcurrentHashMap<String, AtomicInteger>();
 
     public void add(Exchange exchange) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Sun Feb 28 09:14:09 2010
@@ -66,13 +66,16 @@
     private boolean shutdownNowOnTimeout = true;
 
     public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
+        shutdown(context, routes, getTimeout(), getTimeUnit());
+    }
 
+    public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
         long start = System.currentTimeMillis();
 
         if (timeout > 0) {
-            LOG.info("Starting to graceful shutdown routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase() + ")");
+            LOG.info("Starting to graceful shutdown " + routes.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase() + ")");
         } else {
-            LOG.info("Starting to graceful shutdown routes (no timeout)");
+            LOG.info("Starting to graceful shutdown " + routes.size() + " routes (no timeout)");
         }
 
         // use another thread to perform the shutdowns so we can support timeout
@@ -88,7 +91,7 @@
             future.cancel(true);
 
             if (shutdownNowOnTimeout) {
-                LOG.warn("Timeout occurred. Now forcing all routes to be shutdown now.");
+                LOG.warn("Timeout occurred. Now forcing the routes to be shutdown now.");
                 // force the routes to shutdown now
                 shutdownRoutesNow(routes);
             } else {
@@ -100,10 +103,10 @@
         }
 
         long delta = System.currentTimeMillis() - start;
-        // convert to seconds as its easier to read than a big milli seconds number 
+        // convert to seconds as its easier to read than a big milli seconds number
         long seconds = TimeUnit.SECONDS.convert(delta, TimeUnit.MILLISECONDS);
 
-        LOG.info("Graceful shutdown of routes completed in " + seconds + " seconds");
+        LOG.info("Graceful shutdown of " + routes.size() + " routes completed in " + seconds + " seconds");
     }
 
     public void setTimeout(long timeout) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java Sun Feb 28 09:14:09 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.management.mbean;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ManagementStatisticsLevel;
@@ -127,4 +129,14 @@
     public void stop() throws Exception {
         context.stopRoute(getRouteId());
     }
+
+    @ManagedOperation(description = "Graceful Shutdown Route")
+    public void shutdown() throws Exception {
+        context.shutdownRoute(getRouteId());
+    }
+
+    @ManagedOperation(description = "Graceful Shutdown Route using timeout in seconds")
+    public void shutdown(long timeout) throws Exception {
+        context.shutdownRoute(getRouteId(), timeout, TimeUnit.SECONDS);
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java?rev=917147&r1=917146&r2=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java Sun Feb 28 09:14:09 2010
@@ -44,18 +44,29 @@
      * Shutdown the routes
      *
      * @param context   the camel context
-     * @param routes the routes, ordered by the order they was started
+     * @param routes    the routes, ordered by the order they was started
      * @throws Exception is thrown if error shutting down the consumers, however its preferred to avoid this
      */
     void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception;
 
     /**
+     * Shutdown the routes using a specified timeout instead of the default timeout values
+     *
+     * @param context   the camel context
+     * @param routes    the routes, ordered by the order they was started
+     * @param timeout   timeout
+     * @param timeUnit  the unit to use
+     * @throws Exception is thrown if error shutting down the consumers, however its preferred to avoid this
+     */
+    void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception;
+
+    /**
      * Set an timeout to wait for the shutdown to complete.
      * <p/>
      * Setting a value of 0 or negative will disable timeout and wait until complete
      * (potential blocking forever)
      *
-     * @param timeout timeout in millis
+     * @param timeout timeout
      */
     void setTimeout(long timeout);
 

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java (from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java&r1=917137&r2=917147&rev=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java Sun Feb 28 09:14:09 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -24,14 +26,11 @@
 /**
  * @version $Revision$
  */
-public class ShutdownGracefuTimeoutTriggerTest extends ContextTestSupport {
+public class ShutdownRouteGracefulTimeoutTriggerTest extends ContextTestSupport {
 
     private static String foo = "";
 
-    public void testShutdownGraceful() throws Exception {
-        // timeout after 2 seconds
-        context.getShutdownStrategy().setTimeout(2);
-
+    public void testShutdownRouteGraceful() throws Exception {
         getMockEndpoint("mock:foo").expectedMessageCount(1);
 
         template.sendBody("seda:foo", "A");
@@ -44,10 +43,14 @@
 
         // now stop the route before its complete
         foo = foo + "stop";
-        context.stop();
+        // timeout after 2 seconds
+        context.shutdownRoute("seda", 2, TimeUnit.SECONDS);
 
         // should not be able to complete all messages as timeout occurred
         assertNotSame("Should not able able to complete all pending messages", "stopABCDE", foo);
+
+        assertEquals("bar should still be running", true, context.getRouteStatus("bar").isStarted());
+        assertEquals("Seda should be stopped", true, context.getRouteStatus("seda").isStopped());
     }
 
     @Override
@@ -55,11 +58,13 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:foo").to("mock:foo").delay(1000).process(new Processor() {
+                from("seda:foo").routeId("seda").to("mock:foo").delay(1000).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         foo = foo + exchange.getIn().getBody(String.class);
                     }
                 });
+
+                from("direct:bar").routeId("bar").to("mock:bar");
             }
         };
     }

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulTimeoutTriggerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java (from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java&r1=917137&r2=917147&rev=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java Sun Feb 28 09:14:09 2010
@@ -24,11 +24,11 @@
 /**
  * @version $Revision$
  */
-public class ShutdownGracefulWithTimerTest extends ContextTestSupport {
+public class ShutdownRouteGracefulWithTimerTest extends ContextTestSupport {
 
     private static String foo = "";
 
-    public void testShutdownGraceful() throws Exception {
+    public void testShutdownRouteGraceful() throws Exception {
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         // should be stopped before it fires the first one
         getMockEndpoint("mock:timer").expectedMessageCount(0);
@@ -43,10 +43,13 @@
 
         // now stop the route before its complete
         foo = foo + "stop";
-        context.stop();
+        context.shutdownRoute("seda");
 
         // it should wait as there was 1 inflight exchange and 4 pending messages left
         assertEquals("Should graceful shutdown", "stopABCDE", foo);
+
+        assertEquals("Timer should still be running", true, context.getRouteStatus("timer").isStarted());
+        assertEquals("Seda should be stopped", true, context.getRouteStatus("seda").isStopped());
     }
 
     @Override
@@ -54,9 +57,9 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:foo?period=500&delay=2000").to("mock:timer");
+                from("timer:foo?period=500&delay=2000").routeId("timer").to("mock:timer");
 
-                from("seda:foo").to("mock:foo").delay(1000).process(new Processor() {
+                from("seda:foo").routeId("seda").to("mock:foo").delay(1000).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         foo = foo + exchange.getIn().getBody(String.class);
                     }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java (from r917137, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java&r1=917137&r2=917147&rev=917147&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteStopAndStartTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteShutdownAndStartTest.java Sun Feb 28 09:14:09 2010
@@ -29,7 +29,7 @@
 /**
  * @version $Revision$
  */
-public class ManagedRouteStopAndStartTest extends ContextTestSupport {
+public class ManagedRouteShutdownAndStartTest extends ContextTestSupport {
 
     @Override
     protected void setUp() throws Exception {
@@ -37,7 +37,7 @@
         super.setUp();
     }
 
-    public void testStopAndStartRoute() throws Exception {
+    public void testShutdownAndStartRoute() throws Exception {
         MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
         ObjectName on = getRouteObjectName(mbeanServer);
 
@@ -53,7 +53,7 @@
         assertEquals("Should be started", ServiceStatus.Started.name(), state);
 
         // stop
-        mbeanServer.invoke(on, "stop", null, null);
+        mbeanServer.invoke(on, "shutdown", null, null);
 
         state = (String) mbeanServer.getAttribute(on, "State");
         assertEquals("Should be stopped", ServiceStatus.Stopped.name(), state);
@@ -99,4 +99,4 @@
         };
     }
 
-}
+}
\ No newline at end of file