You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/09/21 13:52:44 UTC

svn commit: r817214 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Mon Sep 21 11:52:43 2009
New Revision: 817214

URL: http://svn.apache.org/viewvc?rev=817214&view=rev
Log:
CAMEL-1800: Camel now defers starting route inputs until all routes child services have been pre started. That ensures when consumers are started all the Camel routes are ready to work asap.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelHandledPolicyTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=817214&r1=817213&r2=817214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Mon Sep 21 11:52:43 2009
@@ -269,9 +269,7 @@
      *
      * @param route the route to start
      * @throws Exception is thrown if the route could not be started for whatever reason
-     * @deprecated will be removed in Camel 2.2
      */
-    @Deprecated
     void startRoute(RouteDefinition route) throws Exception;
 
     /**
@@ -288,9 +286,7 @@
      *
      * @param route the route to stop
      * @throws Exception is thrown if the route could not be stopped for whatever reason
-     * @deprecated will be removed in Camel 2.2
      */
-    @Deprecated
     void stopRoute(RouteDefinition route) throws Exception;
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=817214&r1=817213&r2=817214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Mon Sep 21 11:52:43 2009
@@ -31,6 +31,7 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
+import org.apache.camel.Consumer;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.IsSingleton;
@@ -95,7 +96,7 @@
     private static final String NAME_PREFIX = "camel-";
     private static int nameSuffix;
     private boolean routeDefinitionInitiated;
-    private String name;  
+    private String name;
     private final Map<String, Endpoint> endpoints = new LRUCache<String, Endpoint>(1000);
     private final AtomicInteger endpointKeyCounter = new AtomicInteger();
     private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>();
@@ -438,7 +439,7 @@
 
         return answer;
     }
-    
+
     public <T extends Endpoint> T getEndpoint(String name, Class<T> endpointType) {
         Endpoint endpoint = getEndpoint(name);
 
@@ -673,7 +674,7 @@
     public void setTypeConverterRegistry(TypeConverterRegistry typeConverterRegistry) {
         this.typeConverterRegistry = typeConverterRegistry;
     }
-    
+
     public Injector getInjector() {
         if (injector == null) {
             injector = createInjector();
@@ -835,27 +836,56 @@
     public void start() throws Exception {
         super.start();
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting routes");
-        }
+        LOG.debug("Starting routes...");
+
         // the context is now considered started (i.e. isStarted() == true))
         // starting routes is done after, not during context startup
         synchronized (this) {
+            // list of inputs to start when all the routes have been preparated for start
+            Map<Route, Consumer> inputs = new HashMap<Route, Consumer>();
+
             for (RouteService routeService : routeServices.values()) {
                 Boolean autoStart = routeService.getRouteDefinition().isAutoStartup();
                 if (autoStart == null || autoStart) {
-                    routeService.start();
+                    // defer starting inputs till later as we want to prepare the routes by starting
+                    // all their processors and child services etc.
+                    // then later we open the floods to Camel by starting the inputs
+                    // what this does is to ensure Camel is more robust on starting routes as all routes
+                    // will then be prepared in time before we start inputs which will consume messages to be routed
+                    routeService.startInputs(false);
+                    try {
+                        routeService.start();
+                        // add the inputs from this route service to the list to start afterwards
+                        inputs.putAll(routeService.getInputs());
+                    } finally {
+                        routeService.startInputs(true);
+                    }
                 } else {
                     // should not start on startup
                     LOG.info("Cannot start route " + routeService.getId() + " as it is configured with auto startup disabled.");
                 }
             }
+
+            // now start the inputs for all the route services as we have prepared Camel
+            // yeah open the floods so messages can start flow into Came;
+            for (Map.Entry<Route, Consumer> entry : inputs.entrySet()) {
+                Route route = entry.getKey();
+                Consumer consumer = entry.getValue();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Starting consumer on route: " + route.getId());
+                }
+                for (LifecycleStrategy strategy : lifecycleStrategies) {
+                    strategy.onServiceAdd(this, consumer, route);
+                }
+                ServiceHelper.startService(consumer);
+            }
         }
+
         if (LOG.isDebugEnabled()) {
             for (int i = 0; i < getRoutes().size(); i++) {
                 LOG.debug("Route " + i + ": " + getRoutes().get(i));
             }
-            LOG.debug("Started routes");
+            LOG.debug("... Routes started");
         }
 
         LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") started");
@@ -931,8 +961,8 @@
         forceLazyInitialization();
         startServices(components.values());
 
-         // To avoid initiating the routeDefinitions after stopping the camel context
-        if (!routeDefinitionInitiated) {            
+        // To avoid initiating the routeDefinitions after stopping the camel context
+        if (!routeDefinitionInitiated) {
             startRouteDefinitions(routeDefinitions);
             routeDefinitionInitiated = true;
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=817214&r1=817213&r2=817214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Mon Sep 21 11:52:43 2009
@@ -17,10 +17,14 @@
 package org.apache.camel.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
 import org.apache.camel.Navigate;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
@@ -47,6 +51,8 @@
     private final List<RouteContext> routeContexts;
     private final List<Route> routes;
     private final String id;
+    private boolean startInputs = true;
+    private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>();
 
     public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) {
         this.camelContext = camelContext;
@@ -76,6 +82,26 @@
         return routes;
     }
 
+    /**
+     * Sets whether inputs (consumers) should be started when starting the routes
+     * <p/>
+     * By default inputs are started.
+     *
+     * @param flag flag to either start inputs or not
+     */
+    public void startInputs(boolean flag) {
+        this.startInputs = flag;
+    }
+
+    /**
+     * Gets the inputs to the routes.
+     *
+     * @return list of {@link Consumer} as inputs for the routes
+     */
+    public Map<Route, Consumer> getInputs() {
+        return inputs;
+    }
+
     protected void doStart() throws Exception {
         camelContext.addRouteCollection(routes);
 
@@ -85,7 +111,7 @@
 
         for (Route route : routes) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Starting route: " + route);
+                LOG.trace("Starting route services: " + route);
             }
 
             List<Service> services = route.getServices();
@@ -99,7 +125,17 @@
                 doGetChildServies(list, service);
             }
 
-            startChildService(route, list);
+            // split into consumers and child services as we need to start the consumers
+            // afterwards to avoid them being active while the others start
+            List<Service> childServices = new ArrayList<Service>();
+            for (Service service : list) {
+                if (service instanceof Consumer) {
+                    inputs.put(route, (Consumer) service);
+                } else {
+                    childServices.add(service);
+                }
+            }
+            startChildService(route, childServices);
 
             // start the route itself
             ServiceHelper.startService(route);
@@ -107,9 +143,21 @@
             // fire event
             EventHelper.notifyRouteStarted(camelContext, route);
         }
+
+        if (startInputs) {
+            // start the input consumers
+            for (Map.Entry<Route, Consumer> entry : inputs.entrySet()) {
+                Route route = entry.getKey();
+                Consumer consumer = entry.getValue();
+                startChildService(route, consumer);
+            }
+        }
     }
 
     protected void doStop() throws Exception {
+        // clear inputs
+        inputs.clear();
+
         for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
             strategy.onRoutesRemove(routes);
         }
@@ -138,6 +186,11 @@
         camelContext.removeRouteCollection(routes);
     }
 
+    protected void startChildService(Route route, Service... services) throws Exception {
+        List<Service> list = new ArrayList<Service>(Arrays.asList(services));
+        startChildService(route, list);
+    }
+
     protected void startChildService(Route route, List<Service> services) throws Exception {
         for (Service service : services) {
             for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelHandledPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelHandledPolicyTest.java?rev=817214&r1=817213&r2=817214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelHandledPolicyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelHandledPolicyTest.java Mon Sep 21 11:52:43 2009
@@ -29,14 +29,18 @@
  */
 public class DeadLetterChannelHandledPolicyTest extends ContextTestSupport {
 
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
     public void testHandled() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(1).redeliverDelay(0).logStackTrace(false).handled(true));
 
-                from("direct:start")
-                    .process(new MyThrowExceptionProcessor());
+                from("direct:start").process(new MyThrowExceptionProcessor());
             }
         });
         context.start();
@@ -55,8 +59,7 @@
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(1).redeliverDelay(0).logStackTrace(false).handled(false));
 
-                from("direct:start")
-                    .process(new MyThrowExceptionProcessor());
+                from("direct:start").process(new MyThrowExceptionProcessor());
             }
         });
         context.start();