You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/10/17 18:42:24 UTC

[camel] branch main updated: CAMEL-19998: reduce coupling between CamelContext and the InternalRouteStartupManager

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 73db0391a6f CAMEL-19998: reduce coupling between CamelContext and the InternalRouteStartupManager
73db0391a6f is described below

commit 73db0391a6f0b146c816b088424ef3998457ecc6
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 17 15:24:59 2023 +0200

    CAMEL-19998: reduce coupling between CamelContext and the InternalRouteStartupManager
---
 .../camel/impl/engine/AbstractCamelContext.java    |  20 ++--
 .../impl/engine/InternalRouteStartupManager.java   | 116 +++++++++++----------
 2 files changed, 72 insertions(+), 64 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 43015772e97..70e7c243dc4 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -211,7 +211,7 @@ public abstract class AbstractCamelContext extends BaseService
     private final ThreadLocal<Boolean> isLockModel = new ThreadLocal<>();
     private final Map<String, RouteService> routeServices = new LinkedHashMap<>();
     private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>();
-    private final InternalRouteStartupManager internalRouteStartupManager = new InternalRouteStartupManager(this);
+    private final InternalRouteStartupManager internalRouteStartupManager = new InternalRouteStartupManager();
     private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<>();
     private final StopWatch stopWatch = new StopWatch(false);
     private final ThreadLocal<Set<String>> componentsInCreation = ThreadLocal.withInitial(() -> new HashSet<>());
@@ -1043,7 +1043,7 @@ public abstract class AbstractCamelContext extends BaseService
     }
 
     public void startAllRoutes() throws Exception {
-        internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, true, false, false);
+        internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, true, false, false);
     }
 
     private void doStopRoutes(RouteController controller, Comparator<RouteStartupOrder> comparator) throws Exception {
@@ -1922,7 +1922,7 @@ public abstract class AbstractCamelContext extends BaseService
 
             // start the suspended routes (do not check for route clashes, and
             // indicate)
-            internalRouteStartupManager.doStartOrResumeRoutes(suspendedRouteServices, false, true, true, false);
+            internalRouteStartupManager.doStartOrResumeRoutes(this, suspendedRouteServices, false, true, true, false);
 
             // mark the route services as resumed (will be marked as started) as
             // well
@@ -2298,7 +2298,7 @@ public abstract class AbstractCamelContext extends BaseService
         // the method is called start but at this point it will only initialize (as context is starting up)
         startRouteDefinitions();
         // this will init route definitions and populate as route services which we can then initialize now
-        internalRouteStartupManager.doInitRoutes(routeServices);
+        internalRouteStartupManager.doInitRoutes(this, routeServices);
         startupStepRecorder.endStep(subStep);
 
         if (!lifecycleStrategies.isEmpty()) {
@@ -2381,7 +2381,7 @@ public abstract class AbstractCamelContext extends BaseService
             // invoke this logic to warm up the routes and if possible also
             // start the routes
             try {
-                internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, true, false, true);
+                internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, true, false, true);
             } catch (Exception e) {
                 throw RuntimeCamelException.wrapRuntimeException(e);
             }
@@ -2710,7 +2710,8 @@ public abstract class AbstractCamelContext extends BaseService
             StartupStep subStep
                     = startupStepRecorder.beginStep(CamelContext.class, camelContextExtension.getName(), "Start Routes");
             EventHelper.notifyCamelContextRoutesStarting(this);
-            internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, !doNotStartRoutesOnFirstStart, false, true);
+            internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, !doNotStartRoutesOnFirstStart, false,
+                    true);
             EventHelper.notifyCamelContextRoutesStarted(this);
             startupStepRecorder.endStep(subStep);
         }
@@ -2782,7 +2783,7 @@ public abstract class AbstractCamelContext extends BaseService
             boolean found = routeStartupOrder.stream().anyMatch(o -> o.getRoute().getId().equals(routeService.getId()));
             if (!found) {
                 LOG.debug("Route: {} which failed to startup will be stopped", routeService.getId());
-                routeStartupOrder.add(internalRouteStartupManager.doPrepareRouteToBeStarted(routeService));
+                routeStartupOrder.add(internalRouteStartupManager.doPrepareRouteToBeStarted(this, routeService));
             }
         }
 
@@ -3032,7 +3033,8 @@ public abstract class AbstractCamelContext extends BaseService
                     StartupStep step
                             = startupStepRecorder.beginStep(Route.class, routeService.getId(), "Start Route Services");
                     // this method will log the routes being started
-                    internalRouteStartupManager.safelyStartRouteServices(true, true, true, false, addingRoutes, routeService);
+                    internalRouteStartupManager.safelyStartRouteServices(this, true, true, true, false, addingRoutes,
+                            routeService);
                     // start route services if it was configured to auto startup
                     // and we are not adding routes
                     boolean isAutoStartup = routeService.isAutoStartup();
@@ -3063,7 +3065,7 @@ public abstract class AbstractCamelContext extends BaseService
             // resume the route service
             if (shouldStartRoutes()) {
                 // this method will log the routes being started
-                internalRouteStartupManager.safelyStartRouteServices(true, false, true, true, false, routeService);
+                internalRouteStartupManager.safelyStartRouteServices(this, true, false, true, true, false, routeService);
                 // must resume route service as well
                 routeService.resume();
             }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
index 16cc6f38466..2a8e9839f3d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
@@ -50,19 +50,14 @@ import org.slf4j.LoggerFactory;
  * <p>
  * This code has been refactored out of {@link AbstractCamelContext} to its own class.
  */
-class InternalRouteStartupManager {
+final class InternalRouteStartupManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(InternalRouteStartupManager.class);
 
     private final ThreadLocal<Route> setupRoute = new ThreadLocal<>();
-    private final AbstractCamelContext abstractCamelContext;
     private final CamelLogger routeLogger = new CamelLogger(LOG);
     private int defaultRouteStartupOrder = 1000;
 
-    public InternalRouteStartupManager(AbstractCamelContext abstractCamelContext) {
-        this.abstractCamelContext = abstractCamelContext;
-    }
-
     /**
      * If Camel is currently starting up a route then this returns the route.
      */
@@ -76,13 +71,13 @@ class InternalRouteStartupManager {
      * @param  routeServices the routes to initialize
      * @throws Exception     is thrown if error initializing routes
      */
-    protected void doInitRoutes(Map<String, RouteService> routeServices)
+    public void doInitRoutes(AbstractCamelContext camelContext, Map<String, RouteService> routeServices)
             throws Exception {
 
-        abstractCamelContext.setStartingRoutes(true);
+        camelContext.setStartingRoutes(true);
         try {
             for (RouteService routeService : routeServices.values()) {
-                StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
+                StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
                         "Init Route");
                 try {
                     LOG.debug("Initializing route id: {}", routeService.getId());
@@ -93,11 +88,11 @@ class InternalRouteStartupManager {
                     routeService.setUp();
                 } finally {
                     setupRoute.remove();
-                    abstractCamelContext.getStartupStepRecorder().endStep(step);
+                    camelContext.getStartupStepRecorder().endStep(step);
                 }
             }
         } finally {
-            abstractCamelContext.setStartingRoutes(false);
+            camelContext.setStartingRoutes(false);
         }
     }
 
@@ -113,10 +108,11 @@ class InternalRouteStartupManager {
      * @throws Exception      is thrown if error starting routes
      */
     protected void doStartOrResumeRoutes(
+            AbstractCamelContext camelContext,
             Map<String, RouteService> routeServices, boolean checkClash, boolean startConsumer, boolean resumeConsumer,
             boolean addingRoutes)
             throws Exception {
-        abstractCamelContext.setStartingRoutes(true);
+        camelContext.setStartingRoutes(true);
         try {
             // filter out already started routes
             Map<String, RouteService> filtered = new LinkedHashMap<>();
@@ -145,10 +141,10 @@ class InternalRouteStartupManager {
             }
 
             // the context is in last phase of staring, so lets start the routes
-            safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
+            safelyStartRouteServices(camelContext, checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
 
         } finally {
-            abstractCamelContext.setStartingRoutes(false);
+            camelContext.setStartingRoutes(false);
         }
     }
 
@@ -167,6 +163,7 @@ class InternalRouteStartupManager {
      * @throws Exception      is thrown if error starting the routes
      */
     protected synchronized void safelyStartRouteServices(
+            AbstractCamelContext camelContext,
             boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes,
             Collection<RouteService> routeServices)
             throws Exception {
@@ -178,70 +175,72 @@ class InternalRouteStartupManager {
 
         // figure out the order in which the routes should be started
         for (RouteService routeService : routeServices) {
-            DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService);
+            DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(camelContext, routeService);
             // check for clash before we add it as input
             if (checkClash) {
-                doCheckStartupOrderClash(order, inputs);
+                doCheckStartupOrderClash(camelContext, order, inputs);
             }
             inputs.put(order.getStartupOrder(), order);
         }
 
         // warm up routes before we start them
-        doWarmUpRoutes(inputs, startConsumer);
+        doWarmUpRoutes(camelContext, inputs, startConsumer);
 
         // sort the startup listeners so they are started in the right order
-        abstractCamelContext.getStartupListeners().sort(OrderedComparator.get());
+        camelContext.getStartupListeners().sort(OrderedComparator.get());
         // now call the startup listeners where the routes has been warmed up
         // (only the actual route consumer has not yet been started)
-        for (StartupListener startup : abstractCamelContext.getStartupListeners()) {
-            startup.onCamelContextStarted(abstractCamelContext.getCamelContextReference(), abstractCamelContext.isStarted());
+        for (StartupListener startup : camelContext.getStartupListeners()) {
+            startup.onCamelContextStarted(camelContext.getCamelContextReference(), camelContext.isStarted());
         }
         // because the consumers may also register startup listeners we need to
         // reset
         // the already started listeners
-        List<StartupListener> backup = new ArrayList<>(abstractCamelContext.getStartupListeners());
-        abstractCamelContext.getStartupListeners().clear();
+        List<StartupListener> backup = new ArrayList<>(camelContext.getStartupListeners());
+        camelContext.getStartupListeners().clear();
 
         // now start the consumers
         if (startConsumer) {
             if (resumeConsumer) {
                 // and now resume the routes
-                doResumeRouteConsumers(inputs, addingRoutes);
+                doResumeRouteConsumers(camelContext, inputs, addingRoutes);
             } else {
                 // and now start the routes
                 // and check for clash with multiple consumers of the same
                 // endpoints which is not allowed
-                doStartRouteConsumers(inputs, addingRoutes);
+                doStartRouteConsumers(camelContext, inputs, addingRoutes);
             }
         }
 
         // sort the startup listeners so they are started in the right order
-        abstractCamelContext.getStartupListeners().sort(OrderedComparator.get());
+        camelContext.getStartupListeners().sort(OrderedComparator.get());
         // now the consumers that was just started may also add new
         // StartupListeners (such as timer)
         // so we need to ensure they get started as well
-        for (StartupListener startup : abstractCamelContext.getStartupListeners()) {
-            startup.onCamelContextStarted(abstractCamelContext.getCamelContextReference(), abstractCamelContext.isStarted());
+        for (StartupListener startup : camelContext.getStartupListeners()) {
+            startup.onCamelContextStarted(camelContext.getCamelContextReference(), camelContext.isStarted());
         }
         // and add the previous started startup listeners to the list so we have
         // them all
-        abstractCamelContext.getStartupListeners().addAll(0, backup);
+        camelContext.getStartupListeners().addAll(0, backup);
 
         // inputs no longer needed
         inputs.clear();
     }
 
     /**
-     * @see #safelyStartRouteServices(boolean, boolean, boolean, boolean, Collection)
+     * @see #safelyStartRouteServices(AbstractCamelContext, boolean, boolean, boolean, boolean, Collection)
      */
-    protected synchronized void safelyStartRouteServices(
+    public synchronized void safelyStartRouteServices(
+            AbstractCamelContext camelContext,
             boolean forceAutoStart, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes,
             RouteService... routeServices)
             throws Exception {
-        safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, Arrays.asList(routeServices));
+        safelyStartRouteServices(camelContext, checkClash, startConsumer, resumeConsumer, addingRoutes,
+                Arrays.asList(routeServices));
     }
 
-    DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService) {
+    DefaultRouteStartupOrder doPrepareRouteToBeStarted(AbstractCamelContext camelContext, RouteService routeService) {
         // add the inputs from this route service to the list to start
         // afterwards
         // should be ordered according to the startup number
@@ -257,7 +256,8 @@ class InternalRouteStartupManager {
         return new DefaultRouteStartupOrder(startupOrder, route, routeService);
     }
 
-    boolean doCheckStartupOrderClash(DefaultRouteStartupOrder answer, Map<Integer, DefaultRouteStartupOrder> inputs)
+    boolean doCheckStartupOrderClash(
+            AbstractCamelContext camelContext, DefaultRouteStartupOrder answer, Map<Integer, DefaultRouteStartupOrder> inputs)
             throws FailedToStartRouteException {
         // check for clash by startupOrder id
         DefaultRouteStartupOrder other = inputs.get(answer.getStartupOrder());
@@ -268,7 +268,7 @@ class InternalRouteStartupManager {
                             .getStartupOrder() + " configured which this route have as well. Please correct startupOrder to be unique among all your routes.");
         }
         // check in existing already started as well
-        for (RouteStartupOrder order : abstractCamelContext.getCamelContextExtension().getRouteStartupOrder()) {
+        for (RouteStartupOrder order : camelContext.getCamelContextExtension().getRouteStartupOrder()) {
             String otherId = order.getRoute().getId();
             if (answer.getRoute().getId().equals(otherId)) {
                 // its the same route id so skip clash check as its the same
@@ -284,7 +284,8 @@ class InternalRouteStartupManager {
         return true;
     }
 
-    void doWarmUpRoutes(Map<Integer, DefaultRouteStartupOrder> inputs, boolean autoStartup) throws FailedToStartRouteException {
+    void doWarmUpRoutes(AbstractCamelContext camelContext, Map<Integer, DefaultRouteStartupOrder> inputs, boolean autoStartup)
+            throws FailedToStartRouteException {
         // now prepare the routes by starting its services before we start the
         // input
         for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
@@ -297,7 +298,7 @@ class InternalRouteStartupManager {
             // will then be prepared in time before we start inputs which will
             // consume messages to be routed
             RouteService routeService = entry.getValue().getRouteService();
-            StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
+            StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
                     "Warump Route");
             try {
                 LOG.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup);
@@ -307,24 +308,29 @@ class InternalRouteStartupManager {
                 routeService.warmUp();
             } finally {
                 setupRoute.remove();
-                abstractCamelContext.getStartupStepRecorder().endStep(step);
+                camelContext.getStartupStepRecorder().endStep(step);
             }
         }
     }
 
-    void doResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception {
-        doStartOrResumeRouteConsumers(inputs, true, addingRoutes);
+    void doResumeRouteConsumers(
+            AbstractCamelContext camelContext, Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes)
+            throws Exception {
+        doStartOrResumeRouteConsumers(camelContext, inputs, true, addingRoutes);
     }
 
-    void doStartRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception {
-        doStartOrResumeRouteConsumers(inputs, false, addingRoutes);
+    void doStartRouteConsumers(
+            AbstractCamelContext camelContext, Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes)
+            throws Exception {
+        doStartOrResumeRouteConsumers(camelContext, inputs, false, addingRoutes);
     }
 
-    private LoggingLevel getRouteLoggerLogLevel() {
-        return abstractCamelContext.getRouteController().getLoggingLevel();
+    private LoggingLevel getRouteLoggerLogLevel(AbstractCamelContext camelContext) {
+        return camelContext.getRouteController().getLoggingLevel();
     }
 
     private void doStartOrResumeRouteConsumers(
+            AbstractCamelContext camelContext,
             Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute)
             throws Exception {
         List<Endpoint> routeInputs = new ArrayList<>();
@@ -340,11 +346,11 @@ class InternalRouteStartupManager {
             if (addingRoute && !autoStartup) {
                 routeLogger.log(
                         "Skipping starting of route " + routeService.getId() + " as it's configured with autoStartup=false",
-                        getRouteLoggerLogLevel());
+                        getRouteLoggerLogLevel(camelContext));
                 continue;
             }
 
-            StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(),
+            StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(),
                     "Start Route");
 
             // do some preparation before starting the consumer on the route
@@ -361,13 +367,13 @@ class InternalRouteStartupManager {
                 // check for multiple consumer violations with existing routes
                 // which have already been started, or is currently starting
                 List<Endpoint> existingEndpoints = new ArrayList<>();
-                for (Route existingRoute : abstractCamelContext.getRoutes()) {
+                for (Route existingRoute : camelContext.getRoutes()) {
                     if (route.getId().equals(existingRoute.getId())) {
                         // skip ourselves
                         continue;
                     }
                     Endpoint existing = existingRoute.getEndpoint();
-                    ServiceStatus status = abstractCamelContext.getRouteStatus(existingRoute.getId());
+                    ServiceStatus status = camelContext.getRouteStatus(existingRoute.getId());
                     if (status != null && (status.isStarted() || status.isStarting())) {
                         existingEndpoints.add(existing);
                     }
@@ -392,14 +398,14 @@ class InternalRouteStartupManager {
                     String uri = endpoint.getEndpointBaseUri();
                     uri = URISupport.sanitizeUri(uri);
                     routeLogger.log("Route: " + route.getId() + " resumed and consuming from: " + uri,
-                            getRouteLoggerLogLevel());
+                            getRouteLoggerLogLevel(camelContext));
                 } else {
                     // when starting we should invoke the lifecycle strategies
-                    for (LifecycleStrategy strategy : abstractCamelContext.getLifecycleStrategies()) {
-                        strategy.onServiceAdd(abstractCamelContext.getCamelContextReference(), consumer, route);
+                    for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
+                        strategy.onServiceAdd(camelContext.getCamelContextReference(), consumer, route);
                     }
                     try {
-                        abstractCamelContext.startService(consumer);
+                        camelContext.startService(consumer);
                         route.getProperties().remove("route.start.exception");
                     } catch (Exception e) {
                         route.getProperties().put("route.start.exception", e);
@@ -410,7 +416,7 @@ class InternalRouteStartupManager {
                     String uri = endpoint.getEndpointBaseUri();
                     uri = URISupport.sanitizeUri(uri);
                     routeLogger.log("Route: " + route.getId() + " started and consuming from: " + uri,
-                            getRouteLoggerLogLevel());
+                            getRouteLoggerLogLevel(camelContext));
                 }
 
                 routeInputs.add(endpoint);
@@ -420,14 +426,14 @@ class InternalRouteStartupManager {
                 // but only add if we haven't already registered it before (we
                 // dont want to double add when restarting)
                 boolean found = false;
-                for (RouteStartupOrder other : abstractCamelContext.getCamelContextExtension().getRouteStartupOrder()) {
+                for (RouteStartupOrder other : camelContext.getCamelContextExtension().getRouteStartupOrder()) {
                     if (other.getRoute().getId().equals(route.getId())) {
                         found = true;
                         break;
                     }
                 }
                 if (!found) {
-                    abstractCamelContext.getCamelContextExtension().getRouteStartupOrder().add(entry.getValue());
+                    camelContext.getCamelContextExtension().getRouteStartupOrder().add(entry.getValue());
                 }
             }
 
@@ -445,7 +451,7 @@ class InternalRouteStartupManager {
                 }
             }
 
-            abstractCamelContext.getStartupStepRecorder().endStep(step);
+            camelContext.getStartupStepRecorder().endStep(step);
         }
     }