You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/10/17 18:42:24 UTC
[camel] branch main updated: CAMEL-19998: reduce coupling between CamelContext and the InternalRouteStartupManager
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 73db0391a6f CAMEL-19998: reduce coupling between CamelContext and the InternalRouteStartupManager
73db0391a6f is described below
commit 73db0391a6f0b146c816b088424ef3998457ecc6
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 17 15:24:59 2023 +0200
CAMEL-19998: reduce coupling between CamelContext and the InternalRouteStartupManager
---
.../camel/impl/engine/AbstractCamelContext.java | 20 ++--
.../impl/engine/InternalRouteStartupManager.java | 116 +++++++++++----------
2 files changed, 72 insertions(+), 64 deletions(-)
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 43015772e97..70e7c243dc4 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -211,7 +211,7 @@ public abstract class AbstractCamelContext extends BaseService
private final ThreadLocal<Boolean> isLockModel = new ThreadLocal<>();
private final Map<String, RouteService> routeServices = new LinkedHashMap<>();
private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>();
- private final InternalRouteStartupManager internalRouteStartupManager = new InternalRouteStartupManager(this);
+ private final InternalRouteStartupManager internalRouteStartupManager = new InternalRouteStartupManager();
private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<>();
private final StopWatch stopWatch = new StopWatch(false);
private final ThreadLocal<Set<String>> componentsInCreation = ThreadLocal.withInitial(() -> new HashSet<>());
@@ -1043,7 +1043,7 @@ public abstract class AbstractCamelContext extends BaseService
}
public void startAllRoutes() throws Exception {
- internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, true, false, false);
+ internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, true, false, false);
}
private void doStopRoutes(RouteController controller, Comparator<RouteStartupOrder> comparator) throws Exception {
@@ -1922,7 +1922,7 @@ public abstract class AbstractCamelContext extends BaseService
// start the suspended routes (do not check for route clashes, and
// indicate)
- internalRouteStartupManager.doStartOrResumeRoutes(suspendedRouteServices, false, true, true, false);
+ internalRouteStartupManager.doStartOrResumeRoutes(this, suspendedRouteServices, false, true, true, false);
// mark the route services as resumed (will be marked as started) as
// well
@@ -2298,7 +2298,7 @@ public abstract class AbstractCamelContext extends BaseService
// the method is called start but at this point it will only initialize (as context is starting up)
startRouteDefinitions();
// this will init route definitions and populate as route services which we can then initialize now
- internalRouteStartupManager.doInitRoutes(routeServices);
+ internalRouteStartupManager.doInitRoutes(this, routeServices);
startupStepRecorder.endStep(subStep);
if (!lifecycleStrategies.isEmpty()) {
@@ -2381,7 +2381,7 @@ public abstract class AbstractCamelContext extends BaseService
// invoke this logic to warm up the routes and if possible also
// start the routes
try {
- internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, true, false, true);
+ internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, true, false, true);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeException(e);
}
@@ -2710,7 +2710,8 @@ public abstract class AbstractCamelContext extends BaseService
StartupStep subStep
= startupStepRecorder.beginStep(CamelContext.class, camelContextExtension.getName(), "Start Routes");
EventHelper.notifyCamelContextRoutesStarting(this);
- internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, !doNotStartRoutesOnFirstStart, false, true);
+ internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, !doNotStartRoutesOnFirstStart, false,
+ true);
EventHelper.notifyCamelContextRoutesStarted(this);
startupStepRecorder.endStep(subStep);
}
@@ -2782,7 +2783,7 @@ public abstract class AbstractCamelContext extends BaseService
boolean found = routeStartupOrder.stream().anyMatch(o -> o.getRoute().getId().equals(routeService.getId()));
if (!found) {
LOG.debug("Route: {} which failed to startup will be stopped", routeService.getId());
- routeStartupOrder.add(internalRouteStartupManager.doPrepareRouteToBeStarted(routeService));
+ routeStartupOrder.add(internalRouteStartupManager.doPrepareRouteToBeStarted(this, routeService));
}
}
@@ -3032,7 +3033,8 @@ public abstract class AbstractCamelContext extends BaseService
StartupStep step
= startupStepRecorder.beginStep(Route.class, routeService.getId(), "Start Route Services");
// this method will log the routes being started
- internalRouteStartupManager.safelyStartRouteServices(true, true, true, false, addingRoutes, routeService);
+ internalRouteStartupManager.safelyStartRouteServices(this, true, true, true, false, addingRoutes,
+ routeService);
// start route services if it was configured to auto startup
// and we are not adding routes
boolean isAutoStartup = routeService.isAutoStartup();
@@ -3063,7 +3065,7 @@ public abstract class AbstractCamelContext extends BaseService
// resume the route service
if (shouldStartRoutes()) {
// this method will log the routes being started
- internalRouteStartupManager.safelyStartRouteServices(true, false, true, true, false, routeService);
+ internalRouteStartupManager.safelyStartRouteServices(this, true, false, true, true, false, routeService);
// must resume route service as well
routeService.resume();
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
index 16cc6f38466..2a8e9839f3d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
@@ -50,19 +50,14 @@ import org.slf4j.LoggerFactory;
* <p>
* This code has been refactored out of {@link AbstractCamelContext} to its own class.
*/
-class InternalRouteStartupManager {
+final class InternalRouteStartupManager {
private static final Logger LOG = LoggerFactory.getLogger(InternalRouteStartupManager.class);
private final ThreadLocal<Route> setupRoute = new ThreadLocal<>();
- private final AbstractCamelContext abstractCamelContext;
private final CamelLogger routeLogger = new CamelLogger(LOG);
private int defaultRouteStartupOrder = 1000;
- public InternalRouteStartupManager(AbstractCamelContext abstractCamelContext) {
- this.abstractCamelContext = abstractCamelContext;
- }
-
/**
* If Camel is currently starting up a route then this returns the route.
*/
@@ -76,13 +71,13 @@ class InternalRouteStartupManager {
* @param routeServices the routes to initialize
* @throws Exception is thrown if error initializing routes
*/
- protected void doInitRoutes(Map<String, RouteService> routeServices)
+ public void doInitRoutes(AbstractCamelContext camelContext, Map<String, RouteService> routeServices)
throws Exception {
- abstractCamelContext.setStartingRoutes(true);
+ camelContext.setStartingRoutes(true);
try {
for (RouteService routeService : routeServices.values()) {
- StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
+ StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
"Init Route");
try {
LOG.debug("Initializing route id: {}", routeService.getId());
@@ -93,11 +88,11 @@ class InternalRouteStartupManager {
routeService.setUp();
} finally {
setupRoute.remove();
- abstractCamelContext.getStartupStepRecorder().endStep(step);
+ camelContext.getStartupStepRecorder().endStep(step);
}
}
} finally {
- abstractCamelContext.setStartingRoutes(false);
+ camelContext.setStartingRoutes(false);
}
}
@@ -113,10 +108,11 @@ class InternalRouteStartupManager {
* @throws Exception is thrown if error starting routes
*/
protected void doStartOrResumeRoutes(
+ AbstractCamelContext camelContext,
Map<String, RouteService> routeServices, boolean checkClash, boolean startConsumer, boolean resumeConsumer,
boolean addingRoutes)
throws Exception {
- abstractCamelContext.setStartingRoutes(true);
+ camelContext.setStartingRoutes(true);
try {
// filter out already started routes
Map<String, RouteService> filtered = new LinkedHashMap<>();
@@ -145,10 +141,10 @@ class InternalRouteStartupManager {
}
// the context is in last phase of staring, so lets start the routes
- safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
+ safelyStartRouteServices(camelContext, checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
} finally {
- abstractCamelContext.setStartingRoutes(false);
+ camelContext.setStartingRoutes(false);
}
}
@@ -167,6 +163,7 @@ class InternalRouteStartupManager {
* @throws Exception is thrown if error starting the routes
*/
protected synchronized void safelyStartRouteServices(
+ AbstractCamelContext camelContext,
boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes,
Collection<RouteService> routeServices)
throws Exception {
@@ -178,70 +175,72 @@ class InternalRouteStartupManager {
// figure out the order in which the routes should be started
for (RouteService routeService : routeServices) {
- DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService);
+ DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(camelContext, routeService);
// check for clash before we add it as input
if (checkClash) {
- doCheckStartupOrderClash(order, inputs);
+ doCheckStartupOrderClash(camelContext, order, inputs);
}
inputs.put(order.getStartupOrder(), order);
}
// warm up routes before we start them
- doWarmUpRoutes(inputs, startConsumer);
+ doWarmUpRoutes(camelContext, inputs, startConsumer);
// sort the startup listeners so they are started in the right order
- abstractCamelContext.getStartupListeners().sort(OrderedComparator.get());
+ camelContext.getStartupListeners().sort(OrderedComparator.get());
// now call the startup listeners where the routes has been warmed up
// (only the actual route consumer has not yet been started)
- for (StartupListener startup : abstractCamelContext.getStartupListeners()) {
- startup.onCamelContextStarted(abstractCamelContext.getCamelContextReference(), abstractCamelContext.isStarted());
+ for (StartupListener startup : camelContext.getStartupListeners()) {
+ startup.onCamelContextStarted(camelContext.getCamelContextReference(), camelContext.isStarted());
}
// because the consumers may also register startup listeners we need to
// reset
// the already started listeners
- List<StartupListener> backup = new ArrayList<>(abstractCamelContext.getStartupListeners());
- abstractCamelContext.getStartupListeners().clear();
+ List<StartupListener> backup = new ArrayList<>(camelContext.getStartupListeners());
+ camelContext.getStartupListeners().clear();
// now start the consumers
if (startConsumer) {
if (resumeConsumer) {
// and now resume the routes
- doResumeRouteConsumers(inputs, addingRoutes);
+ doResumeRouteConsumers(camelContext, inputs, addingRoutes);
} else {
// and now start the routes
// and check for clash with multiple consumers of the same
// endpoints which is not allowed
- doStartRouteConsumers(inputs, addingRoutes);
+ doStartRouteConsumers(camelContext, inputs, addingRoutes);
}
}
// sort the startup listeners so they are started in the right order
- abstractCamelContext.getStartupListeners().sort(OrderedComparator.get());
+ camelContext.getStartupListeners().sort(OrderedComparator.get());
// now the consumers that was just started may also add new
// StartupListeners (such as timer)
// so we need to ensure they get started as well
- for (StartupListener startup : abstractCamelContext.getStartupListeners()) {
- startup.onCamelContextStarted(abstractCamelContext.getCamelContextReference(), abstractCamelContext.isStarted());
+ for (StartupListener startup : camelContext.getStartupListeners()) {
+ startup.onCamelContextStarted(camelContext.getCamelContextReference(), camelContext.isStarted());
}
// and add the previous started startup listeners to the list so we have
// them all
- abstractCamelContext.getStartupListeners().addAll(0, backup);
+ camelContext.getStartupListeners().addAll(0, backup);
// inputs no longer needed
inputs.clear();
}
/**
- * @see #safelyStartRouteServices(boolean, boolean, boolean, boolean, Collection)
+ * @see #safelyStartRouteServices(AbstractCamelContext, boolean, boolean, boolean, boolean, Collection)
*/
- protected synchronized void safelyStartRouteServices(
+ public synchronized void safelyStartRouteServices(
+ AbstractCamelContext camelContext,
boolean forceAutoStart, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes,
RouteService... routeServices)
throws Exception {
- safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, Arrays.asList(routeServices));
+ safelyStartRouteServices(camelContext, checkClash, startConsumer, resumeConsumer, addingRoutes,
+ Arrays.asList(routeServices));
}
- DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService) {
+ DefaultRouteStartupOrder doPrepareRouteToBeStarted(AbstractCamelContext camelContext, RouteService routeService) {
// add the inputs from this route service to the list to start
// afterwards
// should be ordered according to the startup number
@@ -257,7 +256,8 @@ class InternalRouteStartupManager {
return new DefaultRouteStartupOrder(startupOrder, route, routeService);
}
- boolean doCheckStartupOrderClash(DefaultRouteStartupOrder answer, Map<Integer, DefaultRouteStartupOrder> inputs)
+ boolean doCheckStartupOrderClash(
+ AbstractCamelContext camelContext, DefaultRouteStartupOrder answer, Map<Integer, DefaultRouteStartupOrder> inputs)
throws FailedToStartRouteException {
// check for clash by startupOrder id
DefaultRouteStartupOrder other = inputs.get(answer.getStartupOrder());
@@ -268,7 +268,7 @@ class InternalRouteStartupManager {
.getStartupOrder() + " configured which this route have as well. Please correct startupOrder to be unique among all your routes.");
}
// check in existing already started as well
- for (RouteStartupOrder order : abstractCamelContext.getCamelContextExtension().getRouteStartupOrder()) {
+ for (RouteStartupOrder order : camelContext.getCamelContextExtension().getRouteStartupOrder()) {
String otherId = order.getRoute().getId();
if (answer.getRoute().getId().equals(otherId)) {
// its the same route id so skip clash check as its the same
@@ -284,7 +284,8 @@ class InternalRouteStartupManager {
return true;
}
- void doWarmUpRoutes(Map<Integer, DefaultRouteStartupOrder> inputs, boolean autoStartup) throws FailedToStartRouteException {
+ void doWarmUpRoutes(AbstractCamelContext camelContext, Map<Integer, DefaultRouteStartupOrder> inputs, boolean autoStartup)
+ throws FailedToStartRouteException {
// now prepare the routes by starting its services before we start the
// input
for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
@@ -297,7 +298,7 @@ class InternalRouteStartupManager {
// will then be prepared in time before we start inputs which will
// consume messages to be routed
RouteService routeService = entry.getValue().getRouteService();
- StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
+ StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
"Warump Route");
try {
LOG.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup);
@@ -307,24 +308,29 @@ class InternalRouteStartupManager {
routeService.warmUp();
} finally {
setupRoute.remove();
- abstractCamelContext.getStartupStepRecorder().endStep(step);
+ camelContext.getStartupStepRecorder().endStep(step);
}
}
}
- void doResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception {
- doStartOrResumeRouteConsumers(inputs, true, addingRoutes);
+ void doResumeRouteConsumers(
+ AbstractCamelContext camelContext, Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes)
+ throws Exception {
+ doStartOrResumeRouteConsumers(camelContext, inputs, true, addingRoutes);
}
- void doStartRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception {
- doStartOrResumeRouteConsumers(inputs, false, addingRoutes);
+ void doStartRouteConsumers(
+ AbstractCamelContext camelContext, Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes)
+ throws Exception {
+ doStartOrResumeRouteConsumers(camelContext, inputs, false, addingRoutes);
}
- private LoggingLevel getRouteLoggerLogLevel() {
- return abstractCamelContext.getRouteController().getLoggingLevel();
+ private LoggingLevel getRouteLoggerLogLevel(AbstractCamelContext camelContext) {
+ return camelContext.getRouteController().getLoggingLevel();
}
private void doStartOrResumeRouteConsumers(
+ AbstractCamelContext camelContext,
Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute)
throws Exception {
List<Endpoint> routeInputs = new ArrayList<>();
@@ -340,11 +346,11 @@ class InternalRouteStartupManager {
if (addingRoute && !autoStartup) {
routeLogger.log(
"Skipping starting of route " + routeService.getId() + " as it's configured with autoStartup=false",
- getRouteLoggerLogLevel());
+ getRouteLoggerLogLevel(camelContext));
continue;
}
- StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(),
+ StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(),
"Start Route");
// do some preparation before starting the consumer on the route
@@ -361,13 +367,13 @@ class InternalRouteStartupManager {
// check for multiple consumer violations with existing routes
// which have already been started, or is currently starting
List<Endpoint> existingEndpoints = new ArrayList<>();
- for (Route existingRoute : abstractCamelContext.getRoutes()) {
+ for (Route existingRoute : camelContext.getRoutes()) {
if (route.getId().equals(existingRoute.getId())) {
// skip ourselves
continue;
}
Endpoint existing = existingRoute.getEndpoint();
- ServiceStatus status = abstractCamelContext.getRouteStatus(existingRoute.getId());
+ ServiceStatus status = camelContext.getRouteStatus(existingRoute.getId());
if (status != null && (status.isStarted() || status.isStarting())) {
existingEndpoints.add(existing);
}
@@ -392,14 +398,14 @@ class InternalRouteStartupManager {
String uri = endpoint.getEndpointBaseUri();
uri = URISupport.sanitizeUri(uri);
routeLogger.log("Route: " + route.getId() + " resumed and consuming from: " + uri,
- getRouteLoggerLogLevel());
+ getRouteLoggerLogLevel(camelContext));
} else {
// when starting we should invoke the lifecycle strategies
- for (LifecycleStrategy strategy : abstractCamelContext.getLifecycleStrategies()) {
- strategy.onServiceAdd(abstractCamelContext.getCamelContextReference(), consumer, route);
+ for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
+ strategy.onServiceAdd(camelContext.getCamelContextReference(), consumer, route);
}
try {
- abstractCamelContext.startService(consumer);
+ camelContext.startService(consumer);
route.getProperties().remove("route.start.exception");
} catch (Exception e) {
route.getProperties().put("route.start.exception", e);
@@ -410,7 +416,7 @@ class InternalRouteStartupManager {
String uri = endpoint.getEndpointBaseUri();
uri = URISupport.sanitizeUri(uri);
routeLogger.log("Route: " + route.getId() + " started and consuming from: " + uri,
- getRouteLoggerLogLevel());
+ getRouteLoggerLogLevel(camelContext));
}
routeInputs.add(endpoint);
@@ -420,14 +426,14 @@ class InternalRouteStartupManager {
// but only add if we haven't already registered it before (we
// dont want to double add when restarting)
boolean found = false;
- for (RouteStartupOrder other : abstractCamelContext.getCamelContextExtension().getRouteStartupOrder()) {
+ for (RouteStartupOrder other : camelContext.getCamelContextExtension().getRouteStartupOrder()) {
if (other.getRoute().getId().equals(route.getId())) {
found = true;
break;
}
}
if (!found) {
- abstractCamelContext.getCamelContextExtension().getRouteStartupOrder().add(entry.getValue());
+ camelContext.getCamelContextExtension().getRouteStartupOrder().add(entry.getValue());
}
}
@@ -445,7 +451,7 @@ class InternalRouteStartupManager {
}
}
- abstractCamelContext.getStartupStepRecorder().endStep(step);
+ camelContext.getStartupStepRecorder().endStep(step);
}
}