You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2009/02/25 18:15:58 UTC

svn commit: r747864 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ components/came...

Author: jstrachan
Date: Wed Feb 25 17:15:58 2009
New Revision: 747864

URL: http://svn.apache.org/viewvc?rev=747864&view=rev
Log:
CAMEL-1004 - make it easy to start/stop routes by their definition (RouteBuilder or Route class) - added an initial implementation which needs more testing but it seems to work

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java
    camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java
    camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed Feb 25 17:15:58 2009
@@ -189,14 +189,6 @@
     List<Route> getRoutes();
 
     /**
-     * Adds a collection of routes to this context
-     *
-     * @param routes the routes to add
-     * @throws Exception if the routes could not be created for whatever reason
-     */
-    void addRoutes(Collection<Route> routes) throws Exception;
-
-    /**
      * Adds a collection of routes to this context using the given builder
      * to build them
      *
@@ -213,6 +205,23 @@
      */
     void addRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception;
 
+    /**
+     * Removes a collection of route definitions from the context - stopping any previously running
+     * routes if any of them are actively running
+     */
+    void removeRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception;
+
+    /**
+     * Starts the given route if it has been previously stopped
+     */
+    void startRoute(RouteType route) throws Exception;
+
+    /**
+     * Stops the given route. It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+     * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
+     */
+    void stopRoute(RouteType route) throws Exception;
+
 
     // Properties
     //-----------------------------------------------------------------------

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed Feb 25 17:15:58 2009
@@ -104,9 +104,12 @@
         for (int i = 0; i < concurrentConsumers; i++) {
             executor.execute(this);
         }
+        endpoint.onStarted(this);
     }
 
     protected void doStop() throws Exception {
+        endpoint.onStopped(this);
+        
         executor.shutdownNow();
         executor = null;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Wed Feb 25 17:15:58 2009
@@ -19,9 +19,12 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
@@ -42,6 +45,8 @@
     private BlockingQueue<Exchange> queue;
     private int size = 1000;
     private int concurrentConsumers = 1;
+    private Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
+    private Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
 
     public SedaEndpoint() {
     }
@@ -67,7 +72,7 @@
     }
     
     public Producer createProducer() throws Exception {
-        return new CollectionProducer(this, getQueue());
+        return new SedaProducer(this, getQueue());
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
@@ -105,7 +110,40 @@
         return true;
     }
 
+    /**
+     * Returns the current pending exchanges
+     */
     public List<Exchange> getExchanges() {
         return new ArrayList<Exchange>(getQueue());
     }
+
+    /**
+     * Returns the current active consumers on this endpoint
+     */
+    public Set<SedaConsumer> getConsumers() {
+        return new HashSet<SedaConsumer>(consumers);
+    }
+
+    /**
+     * Returns the current active producers on this endpoint
+     */
+    public Set<SedaProducer> getProducers() {
+        return new HashSet<SedaProducer>(producers);
+    }
+    
+    void onStarted(SedaProducer producer) {
+        producers.add(producer);
+    }
+
+    void onStopped(SedaProducer producer) {
+        producers.remove(producer);
+    }
+
+    void onStarted(SedaConsumer consumer) {
+        consumers.add(consumer);
+    }
+
+    void onStopped(SedaConsumer consumer) {
+        consumers.remove(consumer);
+    }
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=747864&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Wed Feb 25 17:15:58 2009
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.Exchange;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class SedaProducer extends CollectionProducer {
+    private SedaEndpoint endpoint;
+
+    public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) {
+        super(endpoint, queue);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        endpoint.onStarted(this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        endpoint.onStopped(this);
+        super.doStop();
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Wed Feb 25 17:15:58 2009
@@ -16,17 +16,6 @@
  */
 package org.apache.camel.impl;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import javax.naming.Context;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
 import org.apache.camel.Endpoint;
@@ -56,16 +45,28 @@
 import org.apache.camel.spi.LanguageResolver;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.Registry;
+import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TypeConverterRegistry;
 import org.apache.camel.util.FactoryFinder;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ReflectionInjector;
+import static org.apache.camel.util.ServiceHelper.startServices;
+import static org.apache.camel.util.ServiceHelper.stopServices;
 import org.apache.camel.util.SystemHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import static org.apache.camel.util.ServiceHelper.startServices;
-import static org.apache.camel.util.ServiceHelper.stopServices;
+import javax.naming.Context;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+
 /**
  * Represents the context used to configure routes and the policies to use.
  *
@@ -97,6 +98,7 @@
     private ErrorHandlerBuilder errorHandlerBuilder;
     private Map<String, DataFormatType> dataFormats = new HashMap<String, DataFormatType>();
     private Class<? extends FactoryFinder> factoryFinderClass = FactoryFinder.class;
+    private Map<String, RouteService> routeServices = new HashMap<String, RouteService>();
 
     public DefaultCamelContext() {
         name = NAME_PREFIX + ++nameSuffix;
@@ -112,9 +114,9 @@
                 // if we can't instantiate the JMX enabled strategy then fallback to default
                 // could be because of missing .jars on the classpath
                 LOG.warn("Could not find needed classes for JMX lifecycle strategy."
-                    + " Needed class is in spring-context.jar using Spring 2.5 or newer ("
-                    + " spring-jmx.jar using Spring 2.0.x)."
-                    + " NoClassDefFoundError: " + e.getMessage());
+                        + " Needed class is in spring-context.jar using Spring 2.5 or newer ("
+                        + " spring-jmx.jar using Spring 2.0.x)."
+                        + " NoClassDefFoundError: " + e.getMessage());
             } catch (Exception e) {
                 LOG.warn("Could not create JMX lifecycle strategy, caused by: " + e.getMessage());
             }
@@ -293,7 +295,7 @@
                 stopServices(oldEndpoint);
             } else {
                 for (Map.Entry entry : endpoints.entrySet()) {
-                    oldEndpoint = (Endpoint)entry.getValue();
+                    oldEndpoint = (Endpoint) entry.getValue();
                     if (!oldEndpoint.isSingleton() && uri.equals(oldEndpoint.getEndpointUri())) {
                         answer.add(oldEndpoint);
                         stopServices(oldEndpoint);
@@ -368,11 +370,14 @@
 
     // Route Management Methods
     // -----------------------------------------------------------------------
-    public List<Route> getRoutes() {
+    public synchronized List<Route> getRoutes() {
         if (routes == null) {
             routes = new ArrayList<Route>();
         }
-        return routes;
+        
+        // lets return a copy of the collection as objects are removed later
+        // when services are stopped
+        return new ArrayList<Route>(routes);
     }
 
     public void setRoutes(List<Route> routes) {
@@ -380,18 +385,27 @@
         throw new UnsupportedOperationException("Overriding existing routes is not supported yet, use addRoutes instead");
     }
 
-    public void addRoutes(Collection<Route> routes) throws Exception {
+    synchronized void removeRouteCollection(Collection<Route> routes) {
+         if (this.routes != null){
+             this.routes.removeAll(routes);
+         }
+    }
+
+    synchronized void addRouteCollection(Collection<Route> routes) throws Exception {
         if (this.routes == null) {
             this.routes = new ArrayList<Route>();
         }
 
         if (routes != null) {
             this.routes.addAll(routes);
+/*
+            TODO we should have notified the lifecycle strategy via the RouteService
 
             lifecycleStrategy.onRoutesAdd(routes);
             if (shouldStartRoutes()) {
                 startRoutes(routes);
             }
+*/
         }
     }
 
@@ -402,7 +416,7 @@
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding routes from: " + builder + " routes: " + routeList);
         }
-        addRoutes(routeList);
+        //addRouteCollection(routeList);
     }
 
     public void addRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception {
@@ -410,7 +424,26 @@
         if (shouldStartRoutes()) {
             startRouteDefinitions(routeDefinitions);
         }
+    }
+
+    public void removeRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception {
+        this.routeDefinitions.removeAll(routeDefinitions);
+        for (RouteType routeDefinition : routeDefinitions) {
+            stopRoute(routeDefinition);
+        }
+
+    }
+
+    public void startRoute(RouteType route) throws Exception {
+        Collection<Route> routes = new ArrayList<Route>();
+        List<RouteContext> routeContexts = route.addRoutes(this, routes);
+        RouteService routeService = new RouteService(this, route, routeContexts, routes);
+        startRouteService(routeService);
+    }
 
+
+    public void stopRoute(RouteType route) throws Exception {
+        stopRouteService(route.idOrCreate());
     }
 
     /**
@@ -465,7 +498,7 @@
             // type converter is usually the default one that also is the registry
             if (typeConverter instanceof DefaultTypeConverter) {
                 typeConverterRegistry = (DefaultTypeConverter) typeConverter;
-            } 
+            }
         }
         return typeConverterRegistry;
     }
@@ -523,7 +556,6 @@
      * Sets the registry to the given JNDI context
      *
      * @param jndiContext is the JNDI context to use as the registry
-     *
      * @see #setRegistry(org.apache.camel.spi.Registry)
      */
     public void setJndiContext(Context jndiContext) {
@@ -651,23 +683,70 @@
             }
         }
         startRouteDefinitions(routeDefinitions);
-        startRoutes(routes);
-        
+
+        // lets clear the starting flag as we are now started and we really do start up these services
+        notStarting();
+
+        synchronized (this) {
+            for (RouteService routeService : routeServices.values()) {
+                routeService.start();
+            }
+        }
+        //startRoutes(routes);
+
         LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") started");
     }
 
     protected void startRouteDefinitions(Collection<RouteType> list) throws Exception {
         if (list != null) {
-            Collection<Route> routes = new ArrayList<Route>();
             for (RouteType route : list) {
-                route.addRoutes(this, routes);
+                startRoute(route);
             }
-            addRoutes(routes);
         }
     }
 
-    protected void doStop() throws Exception {
+    /*
+        protected void startRoutes(Collection<Route> routeList) throws Exception {
+            if (routeList != null) {
+                for (Route route : routeList) {
+                    List<Service> services = route.getServicesForRoute();
+                    for (Service service : services) {
+                        addService(service);
+                    }
+                }
+            }
+        }
+
+    */
+
+
+    /**
+     * Starts the given route service
+     */
+    protected synchronized void startRouteService(RouteService routeService) throws Exception {
+        String key = routeService.getId();
+        stopRouteService(key);
+        routeServices.put(key, routeService);
+        if (shouldStartRoutes()) {
+            routeService.start();
+        }
+    }
+
+    /**
+     * Stops the route denoted by the given RouteType id
+     */
+    protected synchronized void stopRouteService(String key) throws Exception {
+        RouteService routeService = routeServices.remove(key);
+        if (routeService != null) {
+            routeService.stop();
+        }
+    }
+
+
+    protected synchronized  void doStop() throws Exception {
         LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") is stopping");
+        stopServices(routeServices.values());
+
         stopServices(servicesToClose);
         if (components != null) {
             for (Component component : components.values()) {
@@ -677,16 +756,6 @@
         LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") stopped");
     }
 
-    protected void startRoutes(Collection<Route> routeList) throws Exception {
-        if (routeList != null) {
-            for (Route route : routeList) {
-                List<Service> services = route.getServicesForRoute();
-                for (Service service : services) {
-                    addService(service);
-                }
-            }
-        }
-    }
 
     /**
      * Lets force some lazy initialization to occur upfront before we start any
@@ -797,7 +866,7 @@
     public Map<String, DataFormatType> getDataFormats() {
         return dataFormats;
     }
-    
+
     public void setFactoryFinderClass(Class<? extends FactoryFinder> finderClass) {
         factoryFinderClass = finderClass;
     }
@@ -818,7 +887,7 @@
         } catch (Exception e) {
             throw new RuntimeCamelException(e);
         }
-        
+
     }
 
 

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=747864&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Wed Feb 25 17:15:58 2009
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.model.RouteType;
+import org.apache.camel.spi.LifecycleStrategy;
+import org.apache.camel.spi.RouteContext;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Represents the runtime objects for a given {@link RouteType} so that it can be stopped independently
+ * of other routes
+ *
+ * @version $Revision: 1.1 $
+ */
+public class RouteService extends ServiceSupport {
+
+    private final DefaultCamelContext camelContext;
+    private final RouteType routeType;
+    private final List<RouteContext> routeContexts;
+    private final Collection<Route> routes;
+    private String id;
+
+    public RouteService(DefaultCamelContext camelContext, RouteType routeType, List<RouteContext> routeContexts, Collection<Route> routes) {
+        this.camelContext = camelContext;
+        this.routeType = routeType;
+        this.routeContexts = routeContexts;
+        this.routes = routes;
+        this.id = routeType.idOrCreate();
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public List<RouteContext> getRouteContexts() {
+        return routeContexts;
+    }
+
+    public RouteType getRouteType() {
+        return routeType;
+    }
+
+    public Collection<Route> getRoutes() {
+        return routes;
+    }
+
+    protected void doStart() throws Exception {
+        camelContext.addRouteCollection(routes);
+
+        getLifecycleStrategy().onRoutesAdd(routes);
+
+        for (Route route : routes) {
+            List<Service> services = route.getServicesForRoute();
+            for (Service service : services) {
+                startChildService(service);
+            }
+        }
+    }
+
+    protected void doStop() throws Exception {
+        camelContext.removeRouteCollection(routes);
+        // TODO should we stop the actual Route objects??
+    }
+
+    protected LifecycleStrategy getLifecycleStrategy() {
+        return camelContext.getLifecycleStrategy();
+    }
+
+    protected void startChildService(Service service) throws Exception {
+        getLifecycleStrategy().onServiceAdd(camelContext, service);
+        service.start();
+        addChildService(service);
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Wed Feb 25 17:15:58 2009
@@ -48,7 +48,7 @@
                 }
                 doStart();
             } finally {
-                starting.set(false);
+                notStarting();
             }
         }
     }
@@ -110,6 +110,11 @@
 
     protected abstract void doStop() throws Exception;
 
+
+    protected void notStarting() {
+        starting.set(false);
+    }
+
     /**
      * Creates a new thread name with the given prefix
      */

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java Wed Feb 25 17:15:58 2009
@@ -74,7 +74,8 @@
         return "Route[" + inputs + " -> " + outputs + "]";
     }
 
-    public void addRoutes(CamelContext context, Collection<Route> routes) throws Exception {
+    public List<RouteContext> addRoutes(CamelContext context, Collection<Route> routes) throws Exception {
+        List<RouteContext> answer = new ArrayList<RouteContext>();
         setCamelContext(context);
 
         ErrorHandlerBuilder handler = context.getErrorHandlerBuilder();
@@ -83,8 +84,10 @@
         }
 
         for (FromType fromType : inputs) {
-            addRoutes(routes, fromType);
+            RouteContext routeContext = addRoutes(routes, fromType);
+            answer.add(routeContext);
         }
+        return answer;
     }
 
 
@@ -260,7 +263,7 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void addRoutes(Collection<Route> routes, FromType fromType) throws Exception {
+    protected RouteContext addRoutes(Collection<Route> routes, FromType fromType) throws Exception {
         RouteContext routeContext = new DefaultRouteContext(this, fromType, routes);
         routeContext.getEndpoint(); // force endpoint resolution
         if (camelContext != null) {
@@ -273,6 +276,7 @@
         }
 
         routeContext.commit();
+        return routeContext;
     }
 
     @Override

Modified: camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java (original)
+++ camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java Wed Feb 25 17:15:58 2009
@@ -22,6 +22,7 @@
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
+import org.apache.camel.model.RouteType;
 import org.apache.camel.bam.model.ActivityState;
 import org.apache.camel.bam.model.ProcessInstance;
 import org.apache.camel.bam.rules.ActivityRules;
@@ -52,12 +53,15 @@
         return processBuilder.createActivityProcessor(this);
     }
 
-    public Route createRoute() throws Exception {
+    /**
+     * Returns the processor of the route
+     */
+    public Processor getProcessor() throws Exception {
         Processor processor = createProcessor();
         if (processor == null) {
             throw new IllegalArgumentException("No processor created for ActivityBuilder: " + this);
         }
-        return new EventDrivenConsumerRoute(getEndpoint(), processor);
+        return processor;
     }
 
     // Builder methods

Modified: camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java (original)
+++ camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java Wed Feb 25 17:15:58 2009
@@ -22,6 +22,9 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Service;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.bam.model.ActivityDefinition;
 import org.apache.camel.bam.model.ProcessDefinition;
 import org.apache.camel.bam.model.ProcessInstance;
@@ -51,6 +54,7 @@
     private Class entityType = ProcessInstance.class;
     private ProcessRules processRules = new ProcessRules();
     private ProcessDefinition processDefinition;
+    private ActivityMonitorEngine engine;
 
     protected ProcessBuilder(JpaTemplate jpaTemplate, TransactionTemplate transactionTemplate) {
         this(jpaTemplate, transactionTemplate, createProcessName());
@@ -138,15 +142,23 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     protected void populateRoutes(List<Route> routes) throws Exception {
-        boolean first = true;
+
+        // lets add the monitoring service - should there be an easier way??
+        if (engine == null) {
+            engine = new ActivityMonitorEngine(getJpaTemplate(), getTransactionTemplate(), getProcessRules());
+        }
+        CamelContext camelContext = getContext();
+        if (camelContext instanceof DefaultCamelContext) {
+            DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
+            defaultCamelContext.addService(engine);
+        }
+
+        // lets create the routes for the activites
         for (ActivityBuilder builder : activityBuilders) {
-            Route route = builder.createRoute();
-            if (first) {
-                route.getServices().add(new ActivityMonitorEngine(getJpaTemplate(), getTransactionTemplate(), getProcessRules()));
-                first = false;
-            }
-            routes.add(route);
+            from(builder.getEndpoint()).process(builder.getProcessor());
         }
+        super.populateRoutes(routes);
+
     }
 
     // Implementation methods