You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/30 10:54:25 UTC

svn commit: r980685 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/management/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Fri Jul 30 08:54:25 2010
New Revision: 980685

URL: http://svn.apache.org/viewvc?rev=980685&view=rev
Log:
CAMEL-3012: CamelContext supports suspend/resume to temporary stop a Camel application.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java
      - copied, changed from r980647, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Jul 30 08:54:25 2010
@@ -478,8 +478,11 @@ public class DefaultCamelContext extends
 
     /**
      * Returns the order in which the route inputs was started.
+     * <p/>
+     * The order may not be according to the startupOrder defined on the route.
+     * For example a route could be started manually later, or new routes added at runtime.
      *
-     * @return a list ordered by the starting order of the route inputs
+     * @return a list in the order how routes was started
      */
     public List<RouteStartupOrder> getRouteStartupOrder() {
         return routeStartupOrder;
@@ -1003,13 +1006,19 @@ public class DefaultCamelContext extends
                         }
                     }
 
-                    // suspend routes using the shutdown strategy so it can shutdown in correct order
-                    // TODO: leverage shutdown strategy to let it run in suspend mode, so it can suspend routes in correct order
+                    // assemble list of startup ordering so routes can be shutdown accordingly
+                    List<RouteStartupOrder> orders = new ArrayList<RouteStartupOrder>();
                     for (Map.Entry<String, RouteService> entry : suspendedRouteServices.entrySet()) {
-                        shutdownRoute(entry.getKey());
+                        Route route = entry.getValue().getRoutes().iterator().next();
+                        Integer order = entry.getValue().getRouteDefinition().getStartupOrder();
+                        if (order == null) {
+                            order = defaultRouteStartupOrder++;
+                        }
+                        orders.add(new DefaultRouteStartupOrder(order, route, entry.getValue()));
                     }
 
-                    // TODO: more unit test to ensure suspend/resume with startup ordering is as expected
+                    // suspend routes using the shutdown strategy so it can shutdown in correct order
+                    getShutdownStrategy().suspend(this, orders);
 
                     watch.stop();
                     if (LOG.isInfoEnabled()) {
@@ -1094,7 +1103,21 @@ public class DefaultCamelContext extends
         // filter out already started routes
         Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
         for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
-            if (entry.getValue().getStatus().isStartable()) {
+            boolean startable;
+
+            Consumer consumer = entry.getValue().getRoutes().iterator().next().getConsumer();
+            if (consumer instanceof SuspendableService) {
+                // consumer could be suspended, which is not reflected in the RouteService status
+                startable = ((SuspendableService) consumer).isSuspended();
+            } else if (consumer instanceof ServiceSupport) {
+                // consumer could be stopped, which is not reflected in the RouteService status
+                startable = ((ServiceSupport) consumer).getStatus().isStartable();
+            } else {
+                // no consumer so use state from route service
+                startable = entry.getValue().getStatus().isStartable();
+            }
+
+            if (startable) {
                 filtered.put(entry.getKey(), entry.getValue());
             }
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Fri Jul 30 08:54:25 2010
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -83,11 +84,28 @@ public class DefaultShutdownStrategy ext
         shutdown(context, routes, getTimeout(), getTimeUnit());
     }
 
+    public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
+        doShutdown(context, routes, getTimeout(), getTimeUnit(), true);
+    }
+
     public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
+        doShutdown(context, routes, timeout, timeUnit, false);
+    }
+
+    public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
+        doShutdown(context, routes, timeout, timeUnit, true);
+    }
+
+    protected void doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, boolean suspendOnly) throws Exception {
         StopWatch watch = new StopWatch();
 
-        // should the order of routes be reversed?
+        // at first sort according to route startup order
         List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes);
+        Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() {
+            public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {
+                return o1.getStartupOrder() - o2.getStartupOrder();
+            }
+        });
         if (shutdownRoutesInReverseOrder) {
             Collections.reverse(routesOrdered);
         }
@@ -99,7 +117,7 @@ public class DefaultShutdownStrategy ext
         }
 
         // use another thread to perform the shutdowns so we can support timeout
-        Future future = getExecutorService().submit(new ShutdownTask(context, routesOrdered));
+        Future future = getExecutorService().submit(new ShutdownTask(context, routesOrdered, suspendOnly));
         try {
             if (timeout > 0) {
                 future.get(timeout, timeUnit);
@@ -227,18 +245,18 @@ public class DefaultShutdownStrategy ext
     }
 
     /**
-     * Suspends the consumer immediately.
+     * Suspends/stops the consumer immediately.
      *
-     * @param service the suspendable consumer
      * @param consumer the consumer to suspend
      */
-    protected void suspendNow(SuspendableService service, Consumer consumer) {
+    protected void suspendNow(Consumer consumer) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Suspending: " + consumer);
         }
 
+        // allow us to do custom work before delegating to service helper
         try {
-            service.suspend();
+            ServiceHelper.suspendService(consumer);
         } catch (Throwable e) {
             LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e);
             // fire event
@@ -301,10 +319,12 @@ public class DefaultShutdownStrategy ext
 
         private final CamelContext context;
         private final List<RouteStartupOrder> routes;
+        private final boolean suspendOnly;
 
-        public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes) {
+        public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, boolean suspendOnly) {
             this.context = context;
             this.routes = routes;
+            this.suspendOnly = suspendOnly;
         }
 
         public void run() {
@@ -316,7 +336,7 @@ public class DefaultShutdownStrategy ext
             // 3) shutdown the deferred routes
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("There are " + routes.size() + " routes to shutdown");
+                LOG.debug("There are " + routes.size() + " routes to " + (suspendOnly ? "suspend" : "shutdown"));
             }
 
             // list of deferred consumers to shutdown when all exchanges has been completed routed
@@ -329,7 +349,8 @@ public class DefaultShutdownStrategy ext
                 ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask();
 
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Shutting down route: " + order.getRoute().getId() + " with options [" + shutdownRoute + "," + shutdownRunningTask + "]");
+                    LOG.trace((suspendOnly ? "Suspending route: " : "Shutting down route: ") + order.getRoute().getId()
+                            + " with options [" + shutdownRoute + "," + shutdownRunningTask + "]");
                 }
 
                 for (Consumer consumer : order.getInputs()) {
@@ -356,7 +377,7 @@ public class DefaultShutdownStrategy ext
 
                     if (suspend) {
                         // only suspend it and then later shutdown it
-                        suspendNow((SuspendableService) consumer, consumer);
+                        suspendNow(consumer);
                         // add it to the deferred list so the route will be shutdown later
                         deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
                         LOG.info("Route: " + order.getRoute().getId() + " suspended and shutdown deferred, was consuming from: "
@@ -369,7 +390,7 @@ public class DefaultShutdownStrategy ext
                         // we will stop it later, but for now it must run to be able to help all inflight messages
                         // be safely completed
                         deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
-                        LOG.info("Route: " + order.getRoute().getId() + " shutdown deferred.");
+                        LOG.info("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
                     }
                 }
             }
@@ -413,8 +434,13 @@ public class DefaultShutdownStrategy ext
 
             // now all messages has been completed then stop the deferred consumers
             for (ShutdownDeferredConsumer deferred : deferredConsumers) {
-                shutdownNow(deferred.getConsumer());
-                LOG.info("Route: " + deferred.getRoute().getId() + " shutdown complete.");
+                if (suspendOnly) {
+                    suspendNow(deferred.getConsumer());
+                    LOG.info("Route: " + deferred.getRoute().getId() + " suspend complete.");
+                } else {
+                    shutdownNow(deferred.getConsumer());
+                    LOG.info("Route: " + deferred.getRoute().getId() + " shutdown complete.");
+                }
             }
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Fri Jul 30 08:54:25 2010
@@ -147,7 +147,9 @@ public interface ExecutorServiceStrategy
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name        name which is appended to the thread name
      * @return the created thread pool
+     * @deprecated using cached thread pool is discouraged as they have no upper bound and can overload the JVM
      */
+    @Deprecated
     ExecutorService newCachedThreadPool(Object source, String name);
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java Fri Jul 30 08:54:25 2010
@@ -50,6 +50,15 @@ public interface ShutdownStrategy extend
     void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception;
 
     /**
+     * Suspends the routes
+     *
+     * @param context   the camel context
+     * @param routes    the routes, ordered by the order they was started
+     * @throws Exception is thrown if error suspending the consumers, however its preferred to avoid this
+     */
+    void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception;
+
+    /**
      * Shutdown the routes using a specified timeout instead of the default timeout values
      *
      * @param context   the camel context
@@ -61,6 +70,17 @@ public interface ShutdownStrategy extend
     void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception;
 
     /**
+     * Suspends the routes using a specified timeout instead of the default timeout values
+     *
+     * @param context   the camel context
+     * @param routes    the routes, ordered by the order they was started
+     * @param timeout   timeout
+     * @param timeUnit  the unit to use
+     * @throws Exception is thrown if error suspending the consumers, however its preferred to avoid this
+     */
+    void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception;
+
+    /**
      * Set an timeout to wait for the shutdown to complete.
      * <p/>
      * Setting a value of 0 or negative will disable timeout and wait until complete

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java Fri Jul 30 08:54:25 2010
@@ -141,19 +141,16 @@ public class EventNotifierEventsTest ext
 
         context.suspend();
 
-        assertEquals(8, events.size());
+        assertEquals(6, events.size());
         assertIsInstanceOf(CamelContextSuspendingEvent.class, events.get(4));
-        assertIsInstanceOf(RouteStoppedEvent.class, events.get(5));
-        assertIsInstanceOf(RouteStoppedEvent.class, events.get(6));
-        assertIsInstanceOf(CamelContextSuspendedEvent.class, events.get(7));
+        // notice direct component is not suspended (as they are internal)
+        assertIsInstanceOf(CamelContextSuspendedEvent.class, events.get(5));
 
         context.resume();
 
-        assertEquals(12, events.size());
-        assertIsInstanceOf(CamelContextResumingEvent.class, events.get(8));
-        assertIsInstanceOf(RouteStartedEvent.class, events.get(9));
-        assertIsInstanceOf(RouteStartedEvent.class, events.get(10));
-        assertIsInstanceOf(CamelContextResumedEvent.class, events.get(11));
+        assertEquals(8, events.size());
+        assertIsInstanceOf(CamelContextResumingEvent.class, events.get(6));
+        assertIsInstanceOf(CamelContextResumedEvent.class, events.get(7));
     }
 
     @Override

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java?rev=980685&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java Fri Jul 30 08:54:25 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.RouteStartupOrder;
+
+/**
+ * @version $Revision: 893963 $
+ */
+public class RouteStartupOrderSuspendResumeNoAutoStartupTest extends ContextTestSupport {
+
+    public void testRouteStartupOrderSuspendResumeNoAutoStartup() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        context.suspend();
+        context.resume();
+
+        // route C should still be stopped after we have resumed
+        assertEquals(true, context.getRouteStatus("C").isStopped());
+
+        // assert correct order
+        DefaultCamelContext dcc = (DefaultCamelContext) context;
+        List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
+
+        assertEquals(3, order.size());
+        assertEquals("seda://foo", order.get(0).getRoute().getEndpoint().getEndpointUri());
+        assertEquals("direct://start", order.get(1).getRoute().getEndpoint().getEndpointUri());
+        assertEquals("direct://bar", order.get(2).getRoute().getEndpoint().getEndpointUri());
+    }
+
+    public void testRouteStartupOrderSuspendResumeStartC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // start C
+        context.startRoute("C");
+
+        context.suspend();
+        context.resume();
+
+        // route C should be started
+        assertEquals(true, context.getRouteStatus("C").isStarted());
+
+        // assert correct order
+        DefaultCamelContext dcc = (DefaultCamelContext) context;
+        List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
+
+        assertEquals(4, order.size());
+        assertEquals("seda://foo", order.get(0).getRoute().getEndpoint().getEndpointUri());
+        assertEquals("direct://start", order.get(1).getRoute().getEndpoint().getEndpointUri());
+        assertEquals("direct://bar", order.get(2).getRoute().getEndpoint().getEndpointUri());
+
+        // however its started manually so its started after the auto started
+        assertEquals("seda://bar", order.get(3).getRoute().getEndpoint().getEndpointUri());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routeId("B").startupOrder(2).to("seda:foo");
+
+                from("seda:foo").routeId("A").startupOrder(1).to("mock:result");
+
+                from("direct:bar").routeId("D").startupOrder(9).to("seda:bar");
+
+                from("seda:bar").routeId("C").noAutoStartup().startupOrder(5).to("mock:other");
+            }
+        };
+    }
+}
\ No newline at end of file

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java (from r980647, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java&r1=980647&r2=980685&rev=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java Fri Jul 30 08:54:25 2010
@@ -27,9 +27,9 @@ import org.apache.camel.spi.RouteStartup
 /**
  * @version $Revision$
  */
-public class RouteStartupOrderTest extends ContextTestSupport {
+public class RouteStartupOrderSuspendResumeTest extends ContextTestSupport {
 
-    public void testRouteStartupOrder() throws Exception {
+    public void testRouteStartupOrderSuspendResume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
 
@@ -37,6 +37,9 @@ public class RouteStartupOrderTest exten
 
         assertMockEndpointsSatisfied();
 
+        context.suspend();
+        context.resume();
+
         // assert correct order
         DefaultCamelContext dcc = (DefaultCamelContext) context;
         List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
@@ -53,13 +56,13 @@ public class RouteStartupOrderTest exten
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").startupOrder(2).to("seda:foo");
+                from("direct:start").routeId("B").startupOrder(2).to("seda:foo");
 
-                from("seda:foo").startupOrder(1).to("mock:result");
+                from("seda:foo").routeId("A").startupOrder(1).to("mock:result");
 
-                from("direct:bar").startupOrder(9).to("seda:bar");
+                from("direct:bar").routeId("D").startupOrder(9).to("seda:bar");
 
-                from("seda:bar").startupOrder(5).to("mock:other");
+                from("seda:bar").routeId("C").startupOrder(5).to("mock:other");
             }
         };
     }