You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2020/03/06 13:51:22 UTC
[camel] 04/07: Isolate a bit more the runtime from the model
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 187fcca75df85877e8f940a211d8a5f861cc9fc5
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Mar 6 03:29:39 2020 +0100
Isolate a bit more the runtime from the model
---
.../OnExceptionLoadBalancerDoubleIssueTest.xml | 6 +-
.../src/main/java/org/apache/camel/Route.java | 10 ++
.../camel/impl/engine/AbstractCamelContext.java | 72 ++++++------
.../org/apache/camel/impl/engine/DefaultRoute.java | 36 +++++-
.../impl/engine/DefaultRouteStartupOrder.java | 6 +-
.../{BaseRouteService.java => RouteService.java} | 55 +++++----
.../org/apache/camel/processor/SendProcessor.java | 2 +-
.../camel/processor/channel/DefaultChannel.java | 15 +--
.../org/apache/camel/impl/DefaultCamelContext.java | 9 +-
.../java/org/apache/camel/impl/DefaultModel.java | 1 +
.../org/apache/camel/impl/DefaultModelRoute.java | 5 +-
.../java/org/apache/camel/impl/RouteService.java | 123 ---------------------
.../apache/camel/model/LoadBalancerDefinition.java | 27 +----
.../apache/camel/model/OnCompletionDefinition.java | 16 ++-
.../apache/camel/model/OnExceptionDefinition.java | 12 +-
.../apache/camel/model/ProcessorDefinition.java | 2 -
.../apache/camel/model/RouteDefinitionHelper.java | 2 +-
.../org/apache/camel/model/RoutesDefinition.java | 1 +
.../apache/camel/reifier/LoadBalanceReifier.java | 42 +++----
.../apache/camel/reifier/OnCompletionReifier.java | 14 ---
.../apache/camel/reifier/OnExceptionReifier.java | 14 +--
.../org/apache/camel/reifier/ProcessorReifier.java | 9 +-
.../org/apache/camel/reifier/RouteReifier.java | 4 +-
.../reifier/errorhandler/ErrorHandlerReifier.java | 9 +-
.../reifier/loadbalancer/LoadBalancerReifier.java | 1 -
.../RandomLoadBalanceJavaDSLBuilderTest.java | 6 +-
.../DefaultExceptionPolicyStrategyTest.java | 2 +-
.../OnExceptionLoadBalancerDoubleIssueTest.java | 3 +-
.../apache/camel/reifier/ProcessorReifierTest.java | 2 +-
...edRouteRemoveContextScopedErrorHandlerTest.java | 4 +-
...emoveRouteAndContextScopedErrorHandlerTest.java | 4 +-
31 files changed, 188 insertions(+), 326 deletions(-)
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.xml
index 5cc4406..1a6b6e5 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.xml
@@ -37,13 +37,17 @@
<onException>
<exception>java.lang.Exception</exception>
<handled> <constant>true</constant> </handled>
+ <to uri="direct:error"/>
+ </onException>
+ <route>
+ <from uri="direct:error"/>
<loadBalance>
<roundRobin id="round"/>
<to uri="mock:error"/>
<to uri="mock:error2"/>
<to uri="mock:error3"/>
</loadBalance>
- </onException>
+ </route>
<route>
<from uri="direct:foo"/>
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java
index 353a335..13586e6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -221,6 +222,11 @@ public interface Route extends RuntimeConfiguration {
String getRouteId();
/**
+ * Gets the route description
+ */
+ String getRouteDescription();
+
+ /**
* Get the route type
*
* @return the route type
@@ -273,12 +279,16 @@ public interface Route extends RuntimeConfiguration {
Processor createErrorHandler(Processor processor) throws Exception;
+ Collection<Processor> getOnCompletions();
+
// called at runtime
Processor getOnCompletion(String onCompletionId);
// called at completion time
void setOnCompletion(String onCompletionId, Processor processor);
+ Collection<Processor> getOnExceptions();
+
Processor getOnException(String onExceptionId);
void setOnException(String onExceptionId, Processor processor);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index bca2019..05c367f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -230,8 +230,8 @@ public abstract class AbstractCamelContext extends ServiceSupport
private ErrorHandlerFactory errorHandlerFactory;
private Map<String, String> globalOptions = new HashMap<>();
private final Map<String, FactoryFinder> factories = new ConcurrentHashMap<>();
- private final Map<String, BaseRouteService> routeServices = new LinkedHashMap<>();
- private final Map<String, BaseRouteService> suspendedRouteServices = new LinkedHashMap<>();
+ private final Map<String, RouteService> routeServices = new LinkedHashMap<>();
+ private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>();
private final Object lock = new Object();
private volatile String version;
@@ -1231,7 +1231,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
}
public ServiceStatus getRouteStatus(String key) {
- BaseRouteService routeService = routeServices.get(key);
+ RouteService routeService = routeServices.get(key);
if (routeService != null) {
return routeService.getStatus();
}
@@ -1264,7 +1264,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
public synchronized void startRoute(String routeId) throws Exception {
DefaultRouteError.reset(this, routeId);
- BaseRouteService routeService = routeServices.get(routeId);
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
try {
startRouteService(routeService, false);
@@ -1285,7 +1285,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
return;
}
- BaseRouteService routeService = routeServices.get(routeId);
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
resumeRouteService(routeService);
// must resume the route as well
@@ -1301,7 +1301,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
public synchronized boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
DefaultRouteError.reset(this, routeId);
- BaseRouteService routeService = routeServices.get(routeId);
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
try {
RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoute(), routeService);
@@ -1336,7 +1336,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
protected synchronized void doShutdownRoute(String routeId, long timeout, TimeUnit timeUnit, boolean removingRoutes) throws Exception {
DefaultRouteError.reset(this, routeId);
- BaseRouteService routeService = routeServices.get(routeId);
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
try {
List<RouteStartupOrder> routes = new ArrayList<>(1);
@@ -1362,11 +1362,11 @@ public abstract class AbstractCamelContext extends ServiceSupport
// known if a given endpoints is in use
// by one or more routes, when we remove the route
Map<String, Set<Endpoint>> endpointsInUse = new HashMap<>();
- for (Map.Entry<String, BaseRouteService> entry : routeServices.entrySet()) {
+ for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
endpointsInUse.put(entry.getKey(), entry.getValue().gatherEndpoints());
}
- BaseRouteService routeService = routeServices.get(routeId);
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
if (getRouteStatus(routeId).isStopped()) {
try {
@@ -1425,7 +1425,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
return;
}
- BaseRouteService routeService = routeServices.get(routeId);
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
List<RouteStartupOrder> routes = new ArrayList<>(1);
Route route = routeService.getRoute();
@@ -2395,7 +2395,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
// because we only want to suspend started routes
// (so when we resume we only resume the routes which actually was
// suspended)
- for (Map.Entry<String, BaseRouteService> entry : getRouteServices().entrySet()) {
+ for (Map.Entry<String, RouteService> entry : getRouteServices().entrySet()) {
if (entry.getValue().getStatus().isStarted()) {
suspendedRouteServices.put(entry.getKey(), entry.getValue());
}
@@ -2404,9 +2404,9 @@ public abstract class AbstractCamelContext extends ServiceSupport
// assemble list of startup ordering so routes can be shutdown
// accordingly
List<RouteStartupOrder> orders = new ArrayList<>();
- for (Map.Entry<String, BaseRouteService> entry : suspendedRouteServices.entrySet()) {
+ for (Map.Entry<String, RouteService> entry : suspendedRouteServices.entrySet()) {
Route route = entry.getValue().getRoute();
- Integer order = entry.getValue().getStartupOrder();
+ Integer order = route.getStartupOrder();
if (order == null) {
order = defaultRouteStartupOrder++;
}
@@ -2419,7 +2419,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
getShutdownStrategy().suspend(this, orders);
// mark the route services as suspended or stopped
- for (BaseRouteService service : suspendedRouteServices.values()) {
+ for (RouteService service : suspendedRouteServices.values()) {
if (routeSupportsSuspension(service.getId())) {
service.suspend();
} else {
@@ -2449,7 +2449,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
// mark the route services as resumed (will be marked as started) as
// well
- for (BaseRouteService service : suspendedRouteServices.values()) {
+ for (RouteService service : suspendedRouteServices.values()) {
if (routeSupportsSuspension(service.getId())) {
service.resume();
} else {
@@ -2856,7 +2856,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
shutdownServices(asyncProcessorAwaitManager);
// we need also to include routes which failed to start to ensure all resources get stopped when stopping Camel
- for (BaseRouteService routeService : routeServices.values()) {
+ for (RouteService routeService : routeServices.values()) {
boolean found = routeStartupOrder.stream().anyMatch(o -> o.getRoute().getId().equals(routeService.getId()));
if (!found) {
LOG.debug("Route: {} which failed to startup will be stopped", routeService.getId());
@@ -2865,10 +2865,10 @@ public abstract class AbstractCamelContext extends ServiceSupport
}
routeStartupOrder.sort(Comparator.comparingInt(RouteStartupOrder::getStartupOrder).reversed());
- List<BaseRouteService> list = new ArrayList<>();
+ List<RouteService> list = new ArrayList<>();
for (RouteStartupOrder startupOrder : routeStartupOrder) {
DefaultRouteStartupOrder order = (DefaultRouteStartupOrder)startupOrder;
- BaseRouteService routeService = order.getRouteService();
+ RouteService routeService = order.getRouteService();
list.add(routeService);
}
shutdownServices(list, false);
@@ -2973,13 +2973,13 @@ public abstract class AbstractCamelContext extends ServiceSupport
* @param addingRoutes whether we are adding new routes
* @throws Exception is thrown if error starting routes
*/
- protected void doStartOrResumeRoutes(Map<String, BaseRouteService> routeServices, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes)
+ protected void doStartOrResumeRoutes(Map<String, RouteService> routeServices, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes)
throws Exception {
setStartingRoutes(true);
try {
// filter out already started routes
- Map<String, BaseRouteService> filtered = new LinkedHashMap<>();
- for (Map.Entry<String, BaseRouteService> entry : routeServices.entrySet()) {
+ Map<String, RouteService> filtered = new LinkedHashMap<>();
+ for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
boolean startable = false;
Consumer consumer = entry.getValue().getRoute().getConsumer();
@@ -3012,7 +3012,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
}
protected boolean routeSupportsSuspension(String routeId) {
- BaseRouteService routeService = routeServices.get(routeId);
+ RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
return routeService.getRoute().supportsSuspension();
}
@@ -3094,7 +3094,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
/**
* Starts the given route service
*/
- public synchronized void startRouteService(BaseRouteService routeService, boolean addingRoutes) throws Exception {
+ public synchronized void startRouteService(RouteService routeService, boolean addingRoutes) throws Exception {
// we may already be starting routes so remember this, so we can unset
// accordingly in finally block
boolean alreadyStartingRoutes = isStartingRoutes();
@@ -3133,7 +3133,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
/**
* Resumes the given route service
*/
- protected synchronized void resumeRouteService(BaseRouteService routeService) throws Exception {
+ protected synchronized void resumeRouteService(RouteService routeService) throws Exception {
// the route service could have been stopped, and if so then start it
// instead
if (!routeService.getStatus().isSuspended()) {
@@ -3149,7 +3149,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
}
}
- protected synchronized void stopRouteService(BaseRouteService routeService, boolean removingRoutes) throws Exception {
+ protected synchronized void stopRouteService(RouteService routeService, boolean removingRoutes) throws Exception {
routeService.setRemovingRoutes(removingRoutes);
stopRouteService(routeService);
}
@@ -3167,17 +3167,17 @@ public abstract class AbstractCamelContext extends ServiceSupport
}
}
- protected synchronized void stopRouteService(BaseRouteService routeService) throws Exception {
+ protected synchronized void stopRouteService(RouteService routeService) throws Exception {
routeService.stop();
logRouteState(routeService.getRoute(), "stopped");
}
- protected synchronized void shutdownRouteService(BaseRouteService routeService) throws Exception {
+ protected synchronized void shutdownRouteService(RouteService routeService) throws Exception {
routeService.shutdown();
logRouteState(routeService.getRoute(), "shutdown and removed");
}
- protected synchronized void suspendRouteService(BaseRouteService routeService) throws Exception {
+ protected synchronized void suspendRouteService(RouteService routeService) throws Exception {
routeService.setRemovingRoutes(false);
routeService.suspend();
logRouteState(routeService.getRoute(), "suspended");
@@ -3199,7 +3199,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
* @throws Exception is thrown if error starting the routes
*/
protected synchronized void safelyStartRouteServices(boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes,
- Collection<BaseRouteService> routeServices)
+ Collection<RouteService> routeServices)
throws Exception {
// list of inputs to start when all the routes have been prepared for
// starting
@@ -3208,7 +3208,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<>();
// figure out the order in which the routes should be started
- for (BaseRouteService routeService : routeServices) {
+ for (RouteService routeService : routeServices) {
DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService);
// check for clash before we add it as input
if (checkClash) {
@@ -3266,16 +3266,16 @@ public abstract class AbstractCamelContext extends ServiceSupport
* @see #safelyStartRouteServices(boolean,boolean,boolean,boolean,Collection)
*/
protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes,
- BaseRouteService... routeServices)
+ RouteService... routeServices)
throws Exception {
safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, Arrays.asList(routeServices));
}
- private DefaultRouteStartupOrder doPrepareRouteToBeStarted(BaseRouteService routeService) {
+ private DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService) {
// add the inputs from this route service to the list to start
// afterwards
// should be ordered according to the startup number
- Integer startupOrder = routeService.getStartupOrder();
+ Integer startupOrder = routeService.getRoute().getStartupOrder();
if (startupOrder == null) {
// auto assign a default startup order
startupOrder = defaultRouteStartupOrder++;
@@ -3321,7 +3321,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
// routes as all routes
// will then be prepared in time before we start inputs which will
// consume messages to be routed
- BaseRouteService routeService = entry.getValue().getRouteService();
+ RouteService routeService = entry.getValue().getRouteService();
try {
LOG.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup);
setupRoute.set(routeService.getRoute());
@@ -3346,7 +3346,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
Integer order = entry.getKey();
Route route = entry.getValue().getRoute();
- BaseRouteService routeService = entry.getValue().getRouteService();
+ RouteService routeService = entry.getValue().getRouteService();
// if we are starting camel, then skip routes which are configured
// to not be auto started
@@ -4410,7 +4410,7 @@ public abstract class AbstractCamelContext extends ServiceSupport
this.beanProcessorFactory = doAddService(beanProcessorFactory);
}
- protected Map<String, BaseRouteService> getRouteServices() {
+ protected Map<String, RouteService> getRouteServices() {
return routeServices;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 47f6602..f7348e4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -17,6 +17,7 @@
package org.apache.camel.impl.engine;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -63,6 +64,7 @@ public class DefaultRoute extends ServiceSupport implements Route {
private NamedNode route;
private String routeId;
+ private String routeDescription;
private final List<Processor> eventDrivenProcessors = new ArrayList<>();
private CamelContext camelContext;
private List<InterceptStrategy> interceptStrategies = new ArrayList<>();
@@ -96,10 +98,12 @@ public class DefaultRoute extends ServiceSupport implements Route {
private Processor processor;
private Consumer consumer;
- public DefaultRoute(CamelContext camelContext, NamedNode route, String routeId, Endpoint endpoint) {
+ public DefaultRoute(CamelContext camelContext, NamedNode route, String routeId,
+ String routeDescription, Endpoint endpoint) {
this.camelContext = camelContext;
this.route = route;
this.routeId = routeId;
+ this.routeDescription = routeDescription;
this.endpoint = endpoint;
}
@@ -110,7 +114,7 @@ public class DefaultRoute extends ServiceSupport implements Route {
@Override
public String getId() {
- return (String) properties.get(Route.ID_PROPERTY);
+ return routeId;
}
@Override
@@ -260,6 +264,11 @@ public class DefaultRoute extends ServiceSupport implements Route {
return routeId;
}
+ @Override
+ public String getRouteDescription() {
+ return routeDescription;
+ }
+
public List<Processor> getEventDrivenProcessors() {
return eventDrivenProcessors;
}
@@ -481,6 +490,11 @@ public class DefaultRoute extends ServiceSupport implements Route {
}
@Override
+ public Collection<Processor> getOnCompletions() {
+ return onCompletions.values();
+ }
+
+ @Override
public Processor getOnCompletion(String onCompletionId) {
return onCompletions.get(onCompletionId);
}
@@ -491,6 +505,11 @@ public class DefaultRoute extends ServiceSupport implements Route {
}
@Override
+ public Collection<Processor> getOnExceptions() {
+ return onExceptions.values();
+ }
+
+ @Override
public Processor getOnException(String onExceptionId) {
return onExceptions.get(onExceptionId);
}
@@ -553,6 +572,16 @@ public class DefaultRoute extends ServiceSupport implements Route {
if (processor instanceof Service) {
services.add((Service)processor);
}
+ for (Processor p : onCompletions.values()) {
+ if (processor instanceof Service) {
+ services.add((Service)p);
+ }
+ }
+ for (Processor p : onExceptions.values()) {
+ if (processor instanceof Service) {
+ services.add((Service)p);
+ }
+ }
}
@Override
@@ -613,6 +642,9 @@ public class DefaultRoute extends ServiceSupport implements Route {
}
public void initialized() {
+ }
+ public void cleanRouteReference() {
+ route = null;
}
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
index af156c5..92b35d3 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
@@ -32,9 +32,9 @@ public class DefaultRouteStartupOrder implements RouteStartupOrder {
private final int startupOrder;
private final Route route;
- private final BaseRouteService routeService;
+ private final RouteService routeService;
- public DefaultRouteStartupOrder(int startupOrder, Route route, BaseRouteService routeService) {
+ public DefaultRouteStartupOrder(int startupOrder, Route route, RouteService routeService) {
this.startupOrder = startupOrder;
this.route = route;
this.routeService = routeService;
@@ -62,7 +62,7 @@ public class DefaultRouteStartupOrder implements RouteStartupOrder {
return new ArrayList<>(services);
}
- public BaseRouteService getRouteService() {
+ public RouteService getRouteService() {
return routeService;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/RouteService.java
similarity index 92%
rename from core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/RouteService.java
index 3e66d9d..d67ab7e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/RouteService.java
@@ -54,7 +54,7 @@ import static org.apache.camel.spi.UnitOfWork.MDC_ROUTE_ID;
* Represents the runtime objects for a given route so that it can be stopped independently
* of other routes
*/
-public abstract class BaseRouteService extends ChildServiceSupport {
+public class RouteService extends ChildServiceSupport {
private final AbstractCamelContext camelContext;
private final Route route;
@@ -63,7 +63,7 @@ public abstract class BaseRouteService extends ChildServiceSupport {
private final AtomicBoolean warmUpDone = new AtomicBoolean(false);
private final AtomicBoolean endpointDone = new AtomicBoolean(false);
- public BaseRouteService(Route route) {
+ public RouteService(Route route) {
this.route = route;
this.camelContext = this.route.getCamelContext().adapt(AbstractCamelContext.class);
}
@@ -80,8 +80,6 @@ public abstract class BaseRouteService extends ChildServiceSupport {
return route;
}
- public abstract Integer getStartupOrder();
-
/**
* Gather all the endpoints this route service uses
* <p/>
@@ -123,13 +121,16 @@ public abstract class BaseRouteService extends ChildServiceSupport {
try {
doWarmUp();
} catch (Exception e) {
- throw new FailedToStartRouteException(getId(), getRouteDescription(), e);
+ throw new FailedToStartRouteException(getId(), route.getDescription(), e);
}
}
- protected abstract String getRouteDescription();
-
- public abstract boolean isAutoStartup() throws Exception;
+ public boolean isAutoStartup() {
+ if (!getCamelContext().isAutoStartup()) {
+ return false;
+ }
+ return getRoute().isAutoStartup();
+ }
protected synchronized void doWarmUp() throws Exception {
if (endpointDone.compareAndSet(false, true)) {
@@ -363,30 +364,27 @@ public abstract class BaseRouteService extends ChildServiceSupport {
// gather list of services to stop as we need to start child services as well
List<Service> services = new ArrayList<>(route.getServices());
// also get route scoped services
- doGetRouteScopedServices(services);
+ doGetRouteServices(services);
Set<Service> list = new LinkedHashSet<>();
for (Service service : services) {
list.addAll(ServiceHelper.getChildServices(service));
}
// also get route scoped error handler (which must be done last)
- doGetRouteScopedErrorHandler(list);
+ doGetErrorHandler(list);
return list;
}
/**
* Gather the route scoped error handler from the given route
*/
- private void doGetRouteScopedErrorHandler(Set<Service> services) {
+ private void doGetErrorHandler(Set<Service> services) {
// only include error handlers if they are route scoped
- boolean includeErrorHandler = !isContextScopedErrorHandler();
List<Service> extra = new ArrayList<>();
- if (includeErrorHandler) {
- for (Service service : services) {
- if (service instanceof Channel) {
- Processor eh = ((Channel) service).getErrorHandler();
- if (eh instanceof Service) {
- extra.add((Service) eh);
- }
+ for (Service service : services) {
+ if (service instanceof Channel) {
+ Processor eh = ((Channel) service).getErrorHandler();
+ if (eh instanceof Service) {
+ extra.add((Service) eh);
}
}
}
@@ -395,12 +393,23 @@ public abstract class BaseRouteService extends ChildServiceSupport {
}
}
- public abstract boolean isContextScopedErrorHandler();
-
/**
- * Gather all other kind of route scoped services from the given route, except error handler
+ * Gather all other kind of route services from the given route,
+ * except error handler
*/
- protected abstract void doGetRouteScopedServices(List<Service> services);
+ protected void doGetRouteServices(List<Service> services) {
+ for (Processor proc : getRoute().getOnExceptions()) {
+ if (proc instanceof Service) {
+ services.add((Service) proc);
+ }
+ }
+ for (Processor proc : getRoute().getOnCompletions()) {
+ if (proc instanceof Service) {
+ services.add((Service) proc);
+ }
+ }
+ }
+
class MDCHelper implements AutoCloseable {
final Map<String, String> originalContextMap;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
index 0fbe798..48455e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -74,7 +74,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
@Override
public String toString() {
- return id;
+ return destination != null ? destination.toString() : id;
}
@Override
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
index 7b23d0e..9fdc506 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
@@ -68,7 +68,6 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
private ManagementInterceptStrategy.InstrumentationProcessor<?> instrumentationProcessor;
private CamelContext camelContext;
private Route route;
- private boolean routeScoped = true;
public DefaultChannel(CamelContext camelContext) {
super(camelContext);
@@ -148,10 +147,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
protected void doStop() throws Exception {
// do not call super as we want to be in control here of the lifecycle
- if (isRouteScoped()) {
- // only stop services if not context scoped (as context scoped is reused by others)
- ServiceHelper.stopService(output, errorHandler);
- }
+ // only stop services if not context scoped (as context scoped is reused by others)
+ ServiceHelper.stopService(output, errorHandler);
}
@Override
@@ -161,10 +158,6 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
ServiceHelper.stopAndShutdownServices(output, errorHandler);
}
- public boolean isRouteScoped() {
- return routeScoped;
- }
-
/**
* Initializes the channel.
* If the initialized output definition contained outputs (children) then
@@ -181,13 +174,11 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
List<InterceptStrategy> interceptors,
Processor nextProcessor,
NamedRoute routeDefinition,
- boolean first,
- boolean routeScoped) throws Exception {
+ boolean first) throws Exception {
this.route = route;
this.definition = definition;
this.camelContext = route.getCamelContext();
this.nextProcessor = nextProcessor;
- this.routeScoped = routeScoped;
// init CamelContextAware as early as possible on nextProcessor
if (nextProcessor instanceof CamelContextAware) {
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 14a9cd7..da6b96b 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -23,7 +23,7 @@ import java.util.function.Function;
import org.apache.camel.CamelContext;
import org.apache.camel.health.HealthCheckRegistry;
-import org.apache.camel.impl.engine.BaseRouteService;
+import org.apache.camel.impl.engine.RouteService;
import org.apache.camel.impl.engine.SimpleCamelContext;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
@@ -286,9 +286,10 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame
}
@Override
- protected synchronized void shutdownRouteService(BaseRouteService routeService) throws Exception {
- if (routeService instanceof RouteService) {
- model.getRouteDefinitions().remove(((RouteService)routeService).getRouteDefinition());
+ protected synchronized void shutdownRouteService(RouteService routeService) throws Exception {
+ RouteDefinition rd = model.getRouteDefinition(routeService.getId());
+ if (rd != null) {
+ model.getRouteDefinitions().remove(rd);
}
super.shutdownRouteService(routeService);
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
index f78b865..c0a786b 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
@@ -31,6 +31,7 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FailedToStartRouteException;
import org.apache.camel.Route;
import org.apache.camel.impl.engine.AbstractCamelContext;
+import org.apache.camel.impl.engine.RouteService;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.Model;
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModelRoute.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModelRoute.java
index 19f0026..aae9095 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModelRoute.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModelRoute.java
@@ -25,8 +25,9 @@ import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier;
public class DefaultModelRoute extends DefaultRoute {
- public DefaultModelRoute(CamelContext camelContext, RouteDefinition definition, String id, Endpoint endpoint) {
- super(camelContext, definition, id, endpoint);
+ public DefaultModelRoute(CamelContext camelContext, RouteDefinition definition, String id,
+ String description, Endpoint endpoint) {
+ super(camelContext, definition, id, description, endpoint);
}
@Override
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/RouteService.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/RouteService.java
deleted file mode 100644
index cbc1b7e..0000000
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/RouteService.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl;
-
-import java.util.List;
-
-import org.apache.camel.ErrorHandlerFactory;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.Processor;
-import org.apache.camel.Route;
-import org.apache.camel.Service;
-import org.apache.camel.impl.engine.BaseRouteService;
-import org.apache.camel.model.OnCompletionDefinition;
-import org.apache.camel.model.OnExceptionDefinition;
-import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.model.RouteDefinitionHelper;
-import org.apache.camel.support.CamelContextHelper;
-
-/**
- * Represents the runtime objects for a given {@link RouteDefinition} so that it
- * can be stopped independently of other routes
- */
-public class RouteService extends BaseRouteService {
-
- private final RouteDefinition routeDefinition;
-
- public RouteService(Route route) {
- super(route);
- this.routeDefinition = (RouteDefinition) route.getRoute();
- }
-
- public RouteDefinition getRouteDefinition() {
- return routeDefinition;
- }
-
- @Override
- public Integer getStartupOrder() {
- return routeDefinition.getStartupOrder();
- }
-
- @Override
- protected String getRouteDescription() {
- return RouteDefinitionHelper.getRouteMessage(routeDefinition.toString());
- }
-
- @Override
- public boolean isAutoStartup() throws Exception {
- if (!getCamelContext().isAutoStartup()) {
- return false;
- }
- if (!getRoute().isAutoStartup()) {
- return false;
- }
- if (routeDefinition.getAutoStartup() == null) {
- // should auto startup by default
- return true;
- }
- Boolean isAutoStartup = CamelContextHelper.parseBoolean(getCamelContext(), routeDefinition.getAutoStartup());
- return isAutoStartup != null && isAutoStartup;
- }
-
- @Override
- public boolean isContextScopedErrorHandler() {
- if (!routeDefinition.isContextScopedErrorHandler()) {
- return false;
- }
- // if error handler ref is configured it may refer to a context scoped,
- // so we need to check this first
- // the XML DSL will configure error handlers using refs, so we need this
- // additional test
- if (routeDefinition.getErrorHandlerRef() != null) {
- ErrorHandlerFactory routeScoped = getRoute().getErrorHandlerFactory();
- ErrorHandlerFactory contextScoped = getCamelContext().adapt(ExtendedCamelContext.class).getErrorHandlerFactory();
- return routeScoped != null && contextScoped != null && routeScoped == contextScoped;
- }
-
- return true;
- }
-
- /**
- * Gather all other kind of route scoped services from the given route,
- * except error handler
- */
- @Override
- protected void doGetRouteScopedServices(List<Service> services) {
-
- for (ProcessorDefinition<?> output : routeDefinition.getOutputs()) {
- if (output instanceof OnExceptionDefinition) {
- OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition)output;
- if (onExceptionDefinition.isRouteScoped()) {
- Processor errorHandler = getRoute().getOnException(onExceptionDefinition.getId());
- if (errorHandler instanceof Service) {
- services.add((Service)errorHandler);
- }
- }
- } else if (output instanceof OnCompletionDefinition) {
- OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition)output;
- if (onCompletionDefinition.isRouteScoped()) {
- Processor onCompletionProcessor = getRoute().getOnCompletion(onCompletionDefinition.getId());
- if (onCompletionProcessor instanceof Service) {
- services.add((Service)onCompletionProcessor);
- }
- }
- }
- }
- }
-
-}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
index 428a5c3..70823c9 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
@@ -21,7 +21,6 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
-import org.apache.camel.processor.loadbalancer.LoadBalancer;
import org.apache.camel.spi.Metadata;
/**
@@ -33,17 +32,11 @@ import org.apache.camel.spi.Metadata;
@SuppressWarnings("rawtypes")
public class LoadBalancerDefinition extends IdentifiedType {
@XmlTransient
- private LoadBalancer loadBalancer;
- @XmlTransient
private String loadBalancerTypeName;
public LoadBalancerDefinition() {
}
- public LoadBalancerDefinition(LoadBalancer loadBalancer) {
- this.loadBalancer = loadBalancer;
- }
-
protected LoadBalancerDefinition(String loadBalancerTypeName) {
this.loadBalancerTypeName = loadBalancerTypeName;
}
@@ -56,30 +49,12 @@ public class LoadBalancerDefinition extends IdentifiedType {
return Integer.MAX_VALUE;
}
- /**
- * Allows derived classes to customize the load balancer
- */
- public void configureLoadBalancer(LoadBalancer loadBalancer) {
- }
-
- public LoadBalancer getLoadBalancer() {
- return loadBalancer;
- }
-
- public void setLoadBalancer(LoadBalancer loadBalancer) {
- this.loadBalancer = loadBalancer;
- }
-
public String getLoadBalancerTypeName() {
return loadBalancerTypeName;
}
@Override
public String toString() {
- if (loadBalancer != null) {
- return loadBalancer.toString();
- } else {
- return loadBalancerTypeName;
- }
+ return loadBalancerTypeName;
}
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index 42ab99a..c3ab937 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -60,21 +60,27 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit
@XmlTransient
private ExecutorService executorService;
@XmlTransient
- private Boolean routeScoped;
+ private boolean routeScoped = true;
public OnCompletionDefinition() {
}
- public boolean isRouteScoped() {
- // is context scoped by default
- return routeScoped != null ? routeScoped : false;
+ public void setRouteScoped(boolean routeScoped) {
+ this.routeScoped = routeScoped;
}
- public Boolean getRouteScoped() {
+ public boolean isRouteScoped() {
return routeScoped;
}
@Override
+ public void setParent(ProcessorDefinition<?> parent) {
+ if (routeScoped) {
+ super.setParent(parent);
+ }
+ }
+
+ @Override
public String toString() {
return "onCompletion[" + getOutputs() + "]";
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/OnExceptionDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
index f76262d..9d397ec 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
@@ -83,7 +83,7 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio
@XmlTransient
private Processor onExceptionOccurred;
@XmlTransient
- private Boolean routeScoped;
+ private boolean routeScoped = true;
public OnExceptionDefinition() {
}
@@ -101,12 +101,14 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio
}
public boolean isRouteScoped() {
- // is context scoped by default
- return routeScoped != null ? routeScoped : false;
+ return routeScoped;
}
- public Boolean getRouteScoped() {
- return routeScoped;
+ @Override
+ public void setParent(ProcessorDefinition<?> parent) {
+ if (routeScoped) {
+ super.setParent(parent);
+ }
}
@Override
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b5b46d1..09c4e91 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2270,7 +2270,6 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
public OnExceptionDefinition onException(Class<? extends Throwable> exceptionType) {
OnExceptionDefinition answer = new OnExceptionDefinition(exceptionType);
- answer.setRouteScoped(true);
addOutput(answer);
return answer;
}
@@ -2284,7 +2283,6 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
public OnExceptionDefinition onException(Class<? extends Throwable>... exceptions) {
OnExceptionDefinition answer = new OnExceptionDefinition(Arrays.asList(exceptions));
- answer.setRouteScoped(true);
addOutput(answer);
return answer;
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java
index 72b825b..67db827 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java
@@ -421,7 +421,7 @@ public final class RouteDefinitionHelper {
for (ProcessorDefinition child : children) {
// validate that top-level is only added on the route (eg top level)
RouteDefinition route = ProcessorDefinitionHelper.getRoute(child);
- boolean parentIsRoute = route != null && child.getParent() == route;
+ boolean parentIsRoute = child.getParent() == route;
if (child.isTopLevelOnly() && !parentIsRoute) {
throw new IllegalArgumentException("The output must be added as top-level on the route. Try moving " + child + " to the top of route.");
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutesDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutesDefinition.java
index 9439792..95b3b83 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutesDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutesDefinition.java
@@ -273,6 +273,7 @@ public class RoutesDefinition extends OptionalIdentifiedDefinition<RoutesDefinit
*/
public OnCompletionDefinition onCompletion() {
OnCompletionDefinition answer = new OnCompletionDefinition();
+ answer.setRouteScoped(false);
getOnCompletions().add(answer);
return answer;
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java
index b4f28b2..d5156a1 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java
@@ -33,34 +33,26 @@ public class LoadBalanceReifier extends ProcessorReifier<LoadBalanceDefinition>
@Override
public Processor createProcessor() throws Exception {
- // the load balancer is stateful so we should only create it once in
- // case its used from a context scoped error handler
+ LoadBalancer loadBalancer = LoadBalancerReifier.reifier(route, definition.getLoadBalancerType()).createLoadBalancer();
- LoadBalancer loadBalancer = definition.getLoadBalancerType().getLoadBalancer();
- if (loadBalancer == null) {
- // then create it and reuse it
- loadBalancer = LoadBalancerReifier.reifier(route, definition.getLoadBalancerType()).createLoadBalancer();
- definition.getLoadBalancerType().setLoadBalancer(loadBalancer);
-
- // some load balancer can only support a fixed number of outputs
- int max = definition.getLoadBalancerType().getMaximumNumberOfOutputs();
- int size = definition.getOutputs().size();
- if (size > max) {
- throw new IllegalArgumentException("To many outputs configured on " + definition.getLoadBalancerType() + ": " + size + " > " + max);
- }
+ // some load balancer can only support a fixed number of outputs
+ int max = definition.getLoadBalancerType().getMaximumNumberOfOutputs();
+ int size = definition.getOutputs().size();
+ if (size > max) {
+ throw new IllegalArgumentException("To many outputs configured on " + definition.getLoadBalancerType() + ": " + size + " > " + max);
+ }
- for (ProcessorDefinition<?> processorType : definition.getOutputs()) {
- // output must not be another load balancer
- // check for instanceof as the code below as there is
- // compilation errors on earlier versions of JDK6
- // on Windows boxes or with IBM JDKs etc.
- if (LoadBalanceDefinition.class.isInstance(processorType)) {
- throw new IllegalArgumentException("Loadbalancer already configured to: " + definition.getLoadBalancerType() + ". Cannot set it to: " + processorType);
- }
- Processor processor = createProcessor(processorType);
- Channel channel = wrapChannel(processor, processorType);
- loadBalancer.addProcessor(channel);
+ for (ProcessorDefinition<?> processorType : definition.getOutputs()) {
+ // output must not be another load balancer
+ // check for instanceof as the code below as there is
+ // compilation errors on earlier versions of JDK6
+ // on Windows boxes or with IBM JDKs etc.
+ if (LoadBalanceDefinition.class.isInstance(processorType)) {
+ throw new IllegalArgumentException("Loadbalancer already configured to: " + definition.getLoadBalancerType() + ". Cannot set it to: " + processorType);
}
+ Processor processor = createProcessor(processorType);
+ Channel channel = wrapChannel(processor, processorType);
+ loadBalancer.addProcessor(channel);
}
Boolean inherit = definition.isInheritErrorHandler();
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index 0221712..30be398 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -35,20 +35,6 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
@Override
public Processor createProcessor() throws Exception {
- // assign whether this was a route scoped onCompletion or not
- // we need to know this later when setting the parent, as only route
- // scoped should have parent
- // Note: this logic can possible be removed when the Camel routing
- // engine decides at runtime
- // to apply onCompletion in a more dynamic fashion than current code
- // base
- // and therefore is in a better position to decide among context/route
- // scoped OnCompletion at runtime
- Boolean routeScoped = definition.getRouteScoped();
- if (routeScoped == null) {
- routeScoped = definition.getParent() != null;
- }
-
boolean isOnCompleteOnly = parseBoolean(definition.getOnCompleteOnly(), false);
boolean isOnFailureOnly = parseBoolean(definition.getOnFailureOnly(), false);
boolean isParallelProcessing = parseBoolean(definition.getParallelProcessing(), false);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java
index 61652aa..2d35475 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java
@@ -37,18 +37,6 @@ public class OnExceptionReifier extends ProcessorReifier<OnExceptionDefinition>
@Override
public void addRoutes() throws Exception {
- // assign whether this was a route scoped onException or not
- // we need to know this later when setting the parent, as only route
- // scoped should have parent
- // Note: this logic can possible be removed when the Camel routing
- // engine decides at runtime
- // to apply onException in a more dynamic fashion than current code base
- // and therefore is in a better position to decide among context/route
- // scoped OnException at runtime
- if (definition.getRouteScoped() == null) {
- definition.setRouteScoped(definition.getParent() != null);
- }
-
// must validate configuration before creating processor
definition.validateConfiguration();
@@ -62,7 +50,7 @@ public class OnExceptionReifier extends ProcessorReifier<OnExceptionDefinition>
if (child != null) {
// wrap in our special safe fallback error handler if OnException
// have child output
- Processor errorHandler = new FatalFallbackErrorHandler(child);
+ Processor errorHandler = new FatalFallbackErrorHandler(child, false);
String id = getId(definition);
route.setOnException(id, errorHandler);
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index f41b21b..7032398 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -581,15 +581,8 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
if (route != null && !route.getOutputs().isEmpty()) {
first = route.getOutputs().get(0) == definition;
}
- // set scoping
- boolean routeScoped = true;
- if (definition instanceof OnExceptionDefinition) {
- routeScoped = ((OnExceptionDefinition)definition).isRouteScoped();
- } else if (this.definition instanceof OnCompletionDefinition) {
- routeScoped = ((OnCompletionDefinition)definition).isRouteScoped();
- }
// initialize the channel
- channel.initChannel(this.route, definition, child, interceptors, processor, route, first, routeScoped);
+ channel.initChannel(this.route, definition, child, interceptors, processor, route, first);
boolean wrap = false;
// set the error handler, must be done after init as we can set the
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java
index 0e7cfb4..19220e4 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -41,6 +41,7 @@ import org.apache.camel.model.Model;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.PropertyDefinition;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RouteDefinitionHelper;
import org.apache.camel.model.RoutesDefinition;
import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.ContractAdvice;
@@ -257,7 +258,8 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> {
// create route
String id = definition.idOrCreate(camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory());
- DefaultRoute route = new DefaultModelRoute(camelContext, definition, id, endpoint);
+ String desc = RouteDefinitionHelper.getRouteMessage(definition.toString());
+ DefaultRoute route = new DefaultModelRoute(camelContext, definition, id, desc, endpoint);
// configure error handler
route.setErrorHandlerFactory(definition.getErrorHandlerFactory());
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
index 6aa50e9..25da846 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java
@@ -38,7 +38,6 @@ import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.builder.ErrorHandlerBuilderSupport;
import org.apache.camel.builder.NoErrorHandlerBuilder;
import org.apache.camel.model.OnExceptionDefinition;
-import org.apache.camel.model.ProcessorDefinitionHelper;
import org.apache.camel.model.RedeliveryPolicyDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.processor.ErrorHandler;
@@ -266,13 +265,9 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport>
list = createExceptionClasses(exceptionType);
for (Class<? extends Throwable> clazz : list) {
String routeId = null;
- // only get the route id, if the exception type is route
- // scoped
+ // only get the route id, if the exception type is route scoped
if (exceptionType.isRouteScoped()) {
- RouteDefinition route = ProcessorDefinitionHelper.getRoute(exceptionType);
- if (route != null) {
- routeId = route.getId();
- }
+ routeId = route.getRouteId();
}
Predicate when = exceptionType.getOnWhen() != null ? exceptionType.getOnWhen().getExpression() : null;
ExceptionPolicyKey key = new ExceptionPolicyKey(routeId, clazz, when);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/LoadBalancerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/LoadBalancerReifier.java
index fe049c0..778898f 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/LoadBalancerReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/LoadBalancerReifier.java
@@ -84,7 +84,6 @@ public class LoadBalancerReifier<T extends LoadBalancerDefinition> extends Abstr
throw new IllegalArgumentException("Cannot find class: " + loadBalancerTypeName + " in the classpath");
}
answer = (LoadBalancer) camelContext.getInjector().newInstance(type, false);
- definition.configureLoadBalancer(answer);
}
return answer;
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
index bb82bf0..9d99a0b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java
@@ -25,9 +25,8 @@ import org.apache.camel.model.LoadBalanceDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.SendDefinition;
+import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition;
import org.apache.camel.processor.channel.DefaultChannel;
-import org.apache.camel.processor.loadbalancer.LoadBalancer;
-import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;
import org.junit.Test;
/**
@@ -99,8 +98,7 @@ public class RandomLoadBalanceJavaDSLBuilderTest extends RandomLoadBalanceTest {
sb.append(".loadBalance()");
LoadBalanceDefinition lbd = (LoadBalanceDefinition)defn;
- LoadBalancer balancer = lbd.getLoadBalancerType().getLoadBalancer();
- if (balancer instanceof RandomLoadBalancer) {
+ if (lbd.getLoadBalancerType() instanceof RandomLoadBalancerDefinition) {
sb.append(".random()");
}
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java
index 8d7e0c7..f3c00bc 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java
@@ -53,7 +53,7 @@ public class DefaultExceptionPolicyStrategyTest extends Assert {
private ExceptionPolicy exceptionPolicy(Class<? extends Throwable> exceptionClass) {
CamelContext cc = new DefaultCamelContext();
- Route context = new DefaultRoute(cc, null, null, null);
+ Route context = new DefaultRoute(cc, null, null, null, null);
return new DefaultErrorHandlerReifier<>(context, null)
.createExceptionPolicy(new OnExceptionDefinition(exceptionClass));
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.java
index cdeecf4..f81fda3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionLoadBalancerDoubleIssueTest.java
@@ -45,7 +45,8 @@ public class OnExceptionLoadBalancerDoubleIssueTest extends ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- onException(Exception.class).handled(true).loadBalance().roundRobin().id("round").to("mock:error", "mock:error2", "mock:error3").end();
+ onException(Exception.class).handled(true).to("direct:error");
+ from("direct:error").loadBalance().roundRobin().id("round").to("mock:error", "mock:error2", "mock:error3");
from("direct:foo").throwException(new IllegalArgumentException("Forced"));
diff --git a/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java b/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java
index 9102353..dd1ffc5 100644
--- a/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java
@@ -26,7 +26,7 @@ import static junit.framework.TestCase.fail;
public class ProcessorReifierTest {
@Test
public void testHandleCustomProcessorDefinition() {
- Route ctx = new DefaultRoute(null, null, null, null);
+ Route ctx = new DefaultRoute(null, null, null, null, null);
try {
ProcessorReifier.reifier(ctx, new MyProcessor());
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java
index a3c6049..f4b0ae7 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java
@@ -70,9 +70,9 @@ public class ManagedRouteRemoveContextScopedErrorHandlerTest extends ManagementT
set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
assertEquals(0, set.size());
- // but still 1 context scoped error handler
+ // no more error handlers
set = mbeanServer.queryNames(new ObjectName("*:type=errorhandlers,*"), null);
- assertEquals(1, set.size());
+ assertEquals(0, set.size());
}
static ObjectName getRouteObjectName(MBeanServer mbeanServer) throws Exception {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveRouteAndContextScopedErrorHandlerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveRouteAndContextScopedErrorHandlerTest.java
index 8724dbe..de0f1d9 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveRouteAndContextScopedErrorHandlerTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveRouteAndContextScopedErrorHandlerTest.java
@@ -118,9 +118,9 @@ public class ManagedRouteRemoveRouteAndContextScopedErrorHandlerTest extends Man
set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
assertEquals(1, set.size());
- // should still be the context scoped error handler as its not removed when removing a route
+ // should still be one error handler
set = mbeanServer.queryNames(new ObjectName("*:type=errorhandlers,*"), null);
- assertEquals(2, set.size());
+ assertEquals(1, set.size());
}
static ObjectName getRouteObjectName(MBeanServer mbeanServer, String name) throws Exception {