You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2009/02/25 18:15:58 UTC
svn commit: r747864 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/model/ components/came...
Author: jstrachan
Date: Wed Feb 25 17:15:58 2009
New Revision: 747864
URL: http://svn.apache.org/viewvc?rev=747864&view=rev
Log:
CAMEL-1004 - make it easy to start/stop routes by their definition (RouteBuilder or Route class) - added an initial implementation which needs more testing but it seems to work
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java
camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java
camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed Feb 25 17:15:58 2009
@@ -189,14 +189,6 @@
List<Route> getRoutes();
/**
- * Adds a collection of routes to this context
- *
- * @param routes the routes to add
- * @throws Exception if the routes could not be created for whatever reason
- */
- void addRoutes(Collection<Route> routes) throws Exception;
-
- /**
* Adds a collection of routes to this context using the given builder
* to build them
*
@@ -213,6 +205,23 @@
*/
void addRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception;
+ /**
+ * Removes a collection of route definitions from the context - stopping any previously running
+ * routes if any of them are actively running
+ */
+ void removeRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception;
+
+ /**
+ * Starts the given route if it has been previously stopped
+ */
+ void startRoute(RouteType route) throws Exception;
+
+ /**
+ * Stops the given route. It will remain in the list of route definitions return by {@link #getRouteDefinitions()}
+ * unless you use the {@link #removeRouteDefinitions(java.util.Collection)}
+ */
+ void stopRoute(RouteType route) throws Exception;
+
// Properties
//-----------------------------------------------------------------------
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed Feb 25 17:15:58 2009
@@ -104,9 +104,12 @@
for (int i = 0; i < concurrentConsumers; i++) {
executor.execute(this);
}
+ endpoint.onStarted(this);
}
protected void doStop() throws Exception {
+ endpoint.onStopped(this);
+
executor.shutdownNow();
executor = null;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Wed Feb 25 17:15:58 2009
@@ -19,9 +19,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
@@ -42,6 +45,8 @@
private BlockingQueue<Exchange> queue;
private int size = 1000;
private int concurrentConsumers = 1;
+ private Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
+ private Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
public SedaEndpoint() {
}
@@ -67,7 +72,7 @@
}
public Producer createProducer() throws Exception {
- return new CollectionProducer(this, getQueue());
+ return new SedaProducer(this, getQueue());
}
public Consumer createConsumer(Processor processor) throws Exception {
@@ -105,7 +110,40 @@
return true;
}
+ /**
+ * Returns the current pending exchanges
+ */
public List<Exchange> getExchanges() {
return new ArrayList<Exchange>(getQueue());
}
+
+ /**
+ * Returns the current active consumers on this endpoint
+ */
+ public Set<SedaConsumer> getConsumers() {
+ return new HashSet<SedaConsumer>(consumers);
+ }
+
+ /**
+ * Returns the current active producers on this endpoint
+ */
+ public Set<SedaProducer> getProducers() {
+ return new HashSet<SedaProducer>(producers);
+ }
+
+ void onStarted(SedaProducer producer) {
+ producers.add(producer);
+ }
+
+ void onStopped(SedaProducer producer) {
+ producers.remove(producer);
+ }
+
+ void onStarted(SedaConsumer consumer) {
+ consumers.add(consumer);
+ }
+
+ void onStopped(SedaConsumer consumer) {
+ consumers.remove(consumer);
+ }
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=747864&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Wed Feb 25 17:15:58 2009
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.Exchange;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class SedaProducer extends CollectionProducer {
+ private SedaEndpoint endpoint;
+
+ public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) {
+ super(endpoint, queue);
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ endpoint.onStarted(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ endpoint.onStopped(this);
+ super.doStop();
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Wed Feb 25 17:15:58 2009
@@ -16,17 +16,6 @@
*/
package org.apache.camel.impl;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import javax.naming.Context;
-
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
import org.apache.camel.Endpoint;
@@ -56,16 +45,28 @@
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.Registry;
+import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TypeConverterRegistry;
import org.apache.camel.util.FactoryFinder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ReflectionInjector;
+import static org.apache.camel.util.ServiceHelper.startServices;
+import static org.apache.camel.util.ServiceHelper.stopServices;
import org.apache.camel.util.SystemHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import static org.apache.camel.util.ServiceHelper.startServices;
-import static org.apache.camel.util.ServiceHelper.stopServices;
+import javax.naming.Context;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+
/**
* Represents the context used to configure routes and the policies to use.
*
@@ -97,6 +98,7 @@
private ErrorHandlerBuilder errorHandlerBuilder;
private Map<String, DataFormatType> dataFormats = new HashMap<String, DataFormatType>();
private Class<? extends FactoryFinder> factoryFinderClass = FactoryFinder.class;
+ private Map<String, RouteService> routeServices = new HashMap<String, RouteService>();
public DefaultCamelContext() {
name = NAME_PREFIX + ++nameSuffix;
@@ -112,9 +114,9 @@
// if we can't instantiate the JMX enabled strategy then fallback to default
// could be because of missing .jars on the classpath
LOG.warn("Could not find needed classes for JMX lifecycle strategy."
- + " Needed class is in spring-context.jar using Spring 2.5 or newer ("
- + " spring-jmx.jar using Spring 2.0.x)."
- + " NoClassDefFoundError: " + e.getMessage());
+ + " Needed class is in spring-context.jar using Spring 2.5 or newer ("
+ + " spring-jmx.jar using Spring 2.0.x)."
+ + " NoClassDefFoundError: " + e.getMessage());
} catch (Exception e) {
LOG.warn("Could not create JMX lifecycle strategy, caused by: " + e.getMessage());
}
@@ -293,7 +295,7 @@
stopServices(oldEndpoint);
} else {
for (Map.Entry entry : endpoints.entrySet()) {
- oldEndpoint = (Endpoint)entry.getValue();
+ oldEndpoint = (Endpoint) entry.getValue();
if (!oldEndpoint.isSingleton() && uri.equals(oldEndpoint.getEndpointUri())) {
answer.add(oldEndpoint);
stopServices(oldEndpoint);
@@ -368,11 +370,14 @@
// Route Management Methods
// -----------------------------------------------------------------------
- public List<Route> getRoutes() {
+ public synchronized List<Route> getRoutes() {
if (routes == null) {
routes = new ArrayList<Route>();
}
- return routes;
+
+ // lets return a copy of the collection as objects are removed later
+ // when services are stopped
+ return new ArrayList<Route>(routes);
}
public void setRoutes(List<Route> routes) {
@@ -380,18 +385,27 @@
throw new UnsupportedOperationException("Overriding existing routes is not supported yet, use addRoutes instead");
}
- public void addRoutes(Collection<Route> routes) throws Exception {
+ synchronized void removeRouteCollection(Collection<Route> routes) {
+ if (this.routes != null){
+ this.routes.removeAll(routes);
+ }
+ }
+
+ synchronized void addRouteCollection(Collection<Route> routes) throws Exception {
if (this.routes == null) {
this.routes = new ArrayList<Route>();
}
if (routes != null) {
this.routes.addAll(routes);
+/*
+ TODO we should have notified the lifecycle strategy via the RouteService
lifecycleStrategy.onRoutesAdd(routes);
if (shouldStartRoutes()) {
startRoutes(routes);
}
+*/
}
}
@@ -402,7 +416,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Adding routes from: " + builder + " routes: " + routeList);
}
- addRoutes(routeList);
+ //addRouteCollection(routeList);
}
public void addRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception {
@@ -410,7 +424,26 @@
if (shouldStartRoutes()) {
startRouteDefinitions(routeDefinitions);
}
+ }
+
+ public void removeRouteDefinitions(Collection<RouteType> routeDefinitions) throws Exception {
+ this.routeDefinitions.removeAll(routeDefinitions);
+ for (RouteType routeDefinition : routeDefinitions) {
+ stopRoute(routeDefinition);
+ }
+
+ }
+
+ public void startRoute(RouteType route) throws Exception {
+ Collection<Route> routes = new ArrayList<Route>();
+ List<RouteContext> routeContexts = route.addRoutes(this, routes);
+ RouteService routeService = new RouteService(this, route, routeContexts, routes);
+ startRouteService(routeService);
+ }
+
+ public void stopRoute(RouteType route) throws Exception {
+ stopRouteService(route.idOrCreate());
}
/**
@@ -465,7 +498,7 @@
// type converter is usually the default one that also is the registry
if (typeConverter instanceof DefaultTypeConverter) {
typeConverterRegistry = (DefaultTypeConverter) typeConverter;
- }
+ }
}
return typeConverterRegistry;
}
@@ -523,7 +556,6 @@
* Sets the registry to the given JNDI context
*
* @param jndiContext is the JNDI context to use as the registry
- *
* @see #setRegistry(org.apache.camel.spi.Registry)
*/
public void setJndiContext(Context jndiContext) {
@@ -651,23 +683,70 @@
}
}
startRouteDefinitions(routeDefinitions);
- startRoutes(routes);
-
+
+ // lets clear the starting flag as we are now started and we really do start up these services
+ notStarting();
+
+ synchronized (this) {
+ for (RouteService routeService : routeServices.values()) {
+ routeService.start();
+ }
+ }
+ //startRoutes(routes);
+
LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") started");
}
protected void startRouteDefinitions(Collection<RouteType> list) throws Exception {
if (list != null) {
- Collection<Route> routes = new ArrayList<Route>();
for (RouteType route : list) {
- route.addRoutes(this, routes);
+ startRoute(route);
}
- addRoutes(routes);
}
}
- protected void doStop() throws Exception {
+ /*
+ protected void startRoutes(Collection<Route> routeList) throws Exception {
+ if (routeList != null) {
+ for (Route route : routeList) {
+ List<Service> services = route.getServicesForRoute();
+ for (Service service : services) {
+ addService(service);
+ }
+ }
+ }
+ }
+
+ */
+
+
+ /**
+ * Starts the given route service
+ */
+ protected synchronized void startRouteService(RouteService routeService) throws Exception {
+ String key = routeService.getId();
+ stopRouteService(key);
+ routeServices.put(key, routeService);
+ if (shouldStartRoutes()) {
+ routeService.start();
+ }
+ }
+
+ /**
+ * Stops the route denoted by the given RouteType id
+ */
+ protected synchronized void stopRouteService(String key) throws Exception {
+ RouteService routeService = routeServices.remove(key);
+ if (routeService != null) {
+ routeService.stop();
+ }
+ }
+
+
+ protected synchronized void doStop() throws Exception {
LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") is stopping");
+ stopServices(routeServices.values());
+
stopServices(servicesToClose);
if (components != null) {
for (Component component : components.values()) {
@@ -677,16 +756,6 @@
LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") stopped");
}
- protected void startRoutes(Collection<Route> routeList) throws Exception {
- if (routeList != null) {
- for (Route route : routeList) {
- List<Service> services = route.getServicesForRoute();
- for (Service service : services) {
- addService(service);
- }
- }
- }
- }
/**
* Lets force some lazy initialization to occur upfront before we start any
@@ -797,7 +866,7 @@
public Map<String, DataFormatType> getDataFormats() {
return dataFormats;
}
-
+
public void setFactoryFinderClass(Class<? extends FactoryFinder> finderClass) {
factoryFinderClass = finderClass;
}
@@ -818,7 +887,7 @@
} catch (Exception e) {
throw new RuntimeCamelException(e);
}
-
+
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=747864&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Wed Feb 25 17:15:58 2009
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.model.RouteType;
+import org.apache.camel.spi.LifecycleStrategy;
+import org.apache.camel.spi.RouteContext;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Represents the runtime objects for a given {@link RouteType} so that it can be stopped independently
+ * of other routes
+ *
+ * @version $Revision: 1.1 $
+ */
+public class RouteService extends ServiceSupport {
+
+ private final DefaultCamelContext camelContext;
+ private final RouteType routeType;
+ private final List<RouteContext> routeContexts;
+ private final Collection<Route> routes;
+ private String id;
+
+ public RouteService(DefaultCamelContext camelContext, RouteType routeType, List<RouteContext> routeContexts, Collection<Route> routes) {
+ this.camelContext = camelContext;
+ this.routeType = routeType;
+ this.routeContexts = routeContexts;
+ this.routes = routes;
+ this.id = routeType.idOrCreate();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public List<RouteContext> getRouteContexts() {
+ return routeContexts;
+ }
+
+ public RouteType getRouteType() {
+ return routeType;
+ }
+
+ public Collection<Route> getRoutes() {
+ return routes;
+ }
+
+ protected void doStart() throws Exception {
+ camelContext.addRouteCollection(routes);
+
+ getLifecycleStrategy().onRoutesAdd(routes);
+
+ for (Route route : routes) {
+ List<Service> services = route.getServicesForRoute();
+ for (Service service : services) {
+ startChildService(service);
+ }
+ }
+ }
+
+ protected void doStop() throws Exception {
+ camelContext.removeRouteCollection(routes);
+ // TODO should we stop the actual Route objects??
+ }
+
+ protected LifecycleStrategy getLifecycleStrategy() {
+ return camelContext.getLifecycleStrategy();
+ }
+
+ protected void startChildService(Service service) throws Exception {
+ getLifecycleStrategy().onServiceAdd(camelContext, service);
+ service.start();
+ addChildService(service);
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Wed Feb 25 17:15:58 2009
@@ -48,7 +48,7 @@
}
doStart();
} finally {
- starting.set(false);
+ notStarting();
}
}
}
@@ -110,6 +110,11 @@
protected abstract void doStop() throws Exception;
+
+ protected void notStarting() {
+ starting.set(false);
+ }
+
/**
* Creates a new thread name with the given prefix
*/
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java Wed Feb 25 17:15:58 2009
@@ -74,7 +74,8 @@
return "Route[" + inputs + " -> " + outputs + "]";
}
- public void addRoutes(CamelContext context, Collection<Route> routes) throws Exception {
+ public List<RouteContext> addRoutes(CamelContext context, Collection<Route> routes) throws Exception {
+ List<RouteContext> answer = new ArrayList<RouteContext>();
setCamelContext(context);
ErrorHandlerBuilder handler = context.getErrorHandlerBuilder();
@@ -83,8 +84,10 @@
}
for (FromType fromType : inputs) {
- addRoutes(routes, fromType);
+ RouteContext routeContext = addRoutes(routes, fromType);
+ answer.add(routeContext);
}
+ return answer;
}
@@ -260,7 +263,7 @@
// Implementation methods
// -------------------------------------------------------------------------
- protected void addRoutes(Collection<Route> routes, FromType fromType) throws Exception {
+ protected RouteContext addRoutes(Collection<Route> routes, FromType fromType) throws Exception {
RouteContext routeContext = new DefaultRouteContext(this, fromType, routes);
routeContext.getEndpoint(); // force endpoint resolution
if (camelContext != null) {
@@ -273,6 +276,7 @@
}
routeContext.commit();
+ return routeContext;
}
@Override
Modified: camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java (original)
+++ camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ActivityBuilder.java Wed Feb 25 17:15:58 2009
@@ -22,6 +22,7 @@
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.model.RouteType;
import org.apache.camel.bam.model.ActivityState;
import org.apache.camel.bam.model.ProcessInstance;
import org.apache.camel.bam.rules.ActivityRules;
@@ -52,12 +53,15 @@
return processBuilder.createActivityProcessor(this);
}
- public Route createRoute() throws Exception {
+ /**
+ * Returns the processor of the route
+ */
+ public Processor getProcessor() throws Exception {
Processor processor = createProcessor();
if (processor == null) {
throw new IllegalArgumentException("No processor created for ActivityBuilder: " + this);
}
- return new EventDrivenConsumerRoute(getEndpoint(), processor);
+ return processor;
}
// Builder methods
Modified: camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java?rev=747864&r1=747863&r2=747864&view=diff
==============================================================================
--- camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java (original)
+++ camel/trunk/components/camel-bam/src/main/java/org/apache/camel/bam/ProcessBuilder.java Wed Feb 25 17:15:58 2009
@@ -22,6 +22,9 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Service;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.bam.model.ActivityDefinition;
import org.apache.camel.bam.model.ProcessDefinition;
import org.apache.camel.bam.model.ProcessInstance;
@@ -51,6 +54,7 @@
private Class entityType = ProcessInstance.class;
private ProcessRules processRules = new ProcessRules();
private ProcessDefinition processDefinition;
+ private ActivityMonitorEngine engine;
protected ProcessBuilder(JpaTemplate jpaTemplate, TransactionTemplate transactionTemplate) {
this(jpaTemplate, transactionTemplate, createProcessName());
@@ -138,15 +142,23 @@
// Implementation methods
// -------------------------------------------------------------------------
protected void populateRoutes(List<Route> routes) throws Exception {
- boolean first = true;
+
+ // lets add the monitoring service - should there be an easier way??
+ if (engine == null) {
+ engine = new ActivityMonitorEngine(getJpaTemplate(), getTransactionTemplate(), getProcessRules());
+ }
+ CamelContext camelContext = getContext();
+ if (camelContext instanceof DefaultCamelContext) {
+ DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
+ defaultCamelContext.addService(engine);
+ }
+
+ // lets create the routes for the activites
for (ActivityBuilder builder : activityBuilders) {
- Route route = builder.createRoute();
- if (first) {
- route.getServices().add(new ActivityMonitorEngine(getJpaTemplate(), getTransactionTemplate(), getProcessRules()));
- first = false;
- }
- routes.add(route);
+ from(builder.getEndpoint()).process(builder.getProcessor());
}
+ super.populateRoutes(routes);
+
}
// Implementation methods