You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/22 08:47:58 UTC

[camel] branch master updated: CAMEL-15844: camel-core - A route has only one input.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a7e50b  CAMEL-15844: camel-core - A route has only one input.
4a7e50b is described below

commit 4a7e50be8095a1654fe7cc29393635683310bfec
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 22 09:25:44 2021 +0100

    CAMEL-15844: camel-core - A route has only one input.
---
 .../org/apache/camel/spi/RouteStartupOrder.java    |  6 +-
 .../impl/engine/DefaultRouteStartupOrder.java      |  6 +-
 .../camel/impl/engine/DefaultShutdownStrategy.java | 85 +++++++++++-----------
 .../impl/engine/InternalRouteStartupManager.java   | 11 ++-
 .../org/apache/camel/impl/engine/RouteService.java | 16 ++--
 5 files changed, 56 insertions(+), 68 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
index b5cb41d..c2b48b8 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
@@ -46,11 +46,11 @@ public interface RouteStartupOrder {
     Route getRoute();
 
     /**
-     * Gets the input to this route (often only one consumer)
+     * Gets the input to this route
      *
-     * @return the input consumers.
+     * @return the consumer.
      */
-    List<Consumer> getInputs();
+    Consumer getInput();
 
     /**
      * Gets the services to this route.
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
index 92b35d3..f027ccf 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Route;
@@ -51,9 +50,8 @@ public class DefaultRouteStartupOrder implements RouteStartupOrder {
     }
 
     @Override
-    public List<Consumer> getInputs() {
-        Map<Route, Consumer> inputs = routeService.getInputs();
-        return new ArrayList<>(inputs.values());
+    public Consumer getInput() {
+        return routeService.getInput();
     }
 
     @Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
index b03e7b8..d0f82f2 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
@@ -375,9 +375,8 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
                 order.getRoute().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly);
             }
 
-            for (Consumer consumer : order.getInputs()) {
-                shutdownNow(order.getRoute().getId(), consumer);
-            }
+            // shutdown the route consumer
+            shutdownNow(order.getRoute().getId(), order.getInput());
         }
     }
 
@@ -574,53 +573,51 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
                             order.getRoute().getId(), shutdownRoute, shutdownRunningTask);
                 }
 
-                for (Consumer consumer : order.getInputs()) {
-
-                    boolean suspend = false;
+                Consumer consumer = order.getInput();
+                boolean suspend = false;
 
-                    // assume we should shutdown if we are not deferred
-                    boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
+                // assume we should shutdown if we are not deferred
+                boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
 
-                    if (shutdown) {
-                        // if we are to shutdown then check whether we can suspend instead as its a more
-                        // gentle way to graceful shutdown
+                if (shutdown) {
+                    // if we are to shutdown then check whether we can suspend instead as its a more
+                    // gentle way to graceful shutdown
 
-                        // some consumers do not support shutting down so let them decide
-                        // if a consumer is suspendable then prefer to use that and then shutdown later
-                        if (consumer instanceof ShutdownAware) {
-                            shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
-                        }
-                        if (shutdown && consumer instanceof Suspendable) {
-                            // we prefer to suspend over shutdown
-                            suspend = true;
-                        }
+                    // some consumers do not support shutting down so let them decide
+                    // if a consumer is suspendable then prefer to use that and then shutdown later
+                    if (consumer instanceof ShutdownAware) {
+                        shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
                     }
-
-                    // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
-                    if (suspend) {
-                        // only suspend it and then later shutdown it
-                        suspendNow(order.getRoute().getId(), consumer);
-                        // add it to the deferred list so the route will be shutdown later
-                        deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
-                        // use basic endpoint uri to not log verbose details or potential sensitive data
-                        String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
-                        uri = URISupport.sanitizeUri(uri);
-                        LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(),
-                                uri);
-                    } else if (shutdown) {
-                        shutdownNow(order.getRoute().getId(), consumer);
-                        // use basic endpoint uri to not log verbose details or potential sensitive data
-                        String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
-                        uri = URISupport.sanitizeUri(uri);
-                        LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), uri);
-                    } else {
-                        // we will stop it later, but for now it must run to be able to help all inflight messages
-                        // be safely completed
-                        deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
-                        LOG.debug("Route: " + order.getRoute().getId()
-                                  + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
+                    if (shutdown && consumer instanceof Suspendable) {
+                        // we prefer to suspend over shutdown
+                        suspend = true;
                     }
                 }
+
+                // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
+                if (suspend) {
+                    // only suspend it and then later shutdown it
+                    suspendNow(order.getRoute().getId(), consumer);
+                    // add it to the deferred list so the route will be shutdown later
+                    deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
+                    // use basic endpoint uri to not log verbose details or potential sensitive data
+                    String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
+                    uri = URISupport.sanitizeUri(uri);
+                    LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(),
+                            uri);
+                } else if (shutdown) {
+                    shutdownNow(order.getRoute().getId(), consumer);
+                    // use basic endpoint uri to not log verbose details or potential sensitive data
+                    String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
+                    uri = URISupport.sanitizeUri(uri);
+                    LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), uri);
+                } else {
+                    // we will stop it later, but for now it must run to be able to help all inflight messages
+                    // be safely completed
+                    deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
+                    LOG.debug("Route: " + order.getRoute().getId()
+                              + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
+                }
             }
 
             // notify the services we intend to shutdown
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
index 82b15ba..fecd08b1 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
@@ -313,20 +313,19 @@ class InternalRouteStartupManager {
             StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(),
                     "Starting Route");
 
-            // start the service
-            for (Consumer consumer : routeService.getInputs().values()) {
+            // do some preparation before starting the consumer on the route
+            Consumer consumer = routeService.getInput();
+            if (consumer != null) {
                 Endpoint endpoint = consumer.getEndpoint();
 
-                // check multiple consumer violation, with the other routes to
-                // be started
+                // check multiple consumer violation, with the other routes to be started
                 if (!doCheckMultipleConsumerSupportClash(endpoint, routeInputs)) {
                     throw new FailedToStartRouteException(
                             routeService.getId(), "Multiple consumers for the same endpoint is not allowed: " + endpoint);
                 }
 
                 // check for multiple consumer violations with existing routes
-                // which
-                // have already been started, or is currently starting
+                // which have already been started, or is currently starting
                 List<Endpoint> existingEndpoints = new ArrayList<>();
                 for (Route existingRoute : abstractCamelContext.getRoutes()) {
                     if (route.getId().equals(existingRoute.getId())) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
index d9453c1..c55c272 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +56,7 @@ public class RouteService extends ChildServiceSupport {
     private final CamelContext camelContext;
     private final Route route;
     private boolean removingRoutes;
-    private final Map<Route, Consumer> inputs = new HashMap<>();
+    private Consumer input;
     private final AtomicBoolean warmUpDone = new AtomicBoolean();
     private final AtomicBoolean endpointDone = new AtomicBoolean();
 
@@ -98,13 +97,8 @@ public class RouteService extends ChildServiceSupport {
         return answer;
     }
 
-    /**
-     * Gets the inputs to the routes.
-     *
-     * @return list of {@link Consumer} as inputs for the routes
-     */
-    public Map<Route, Consumer> getInputs() {
-        return inputs;
+    public Consumer getInput() {
+        return input;
     }
 
     public boolean isRemovingRoutes() {
@@ -173,7 +167,7 @@ public class RouteService extends ChildServiceSupport {
                     }
 
                     if (service instanceof Consumer) {
-                        inputs.put(route, (Consumer) service);
+                        this.input = (Consumer) service;
                     } else {
                         childServices.add(service);
                     }
@@ -301,7 +295,7 @@ public class RouteService extends ChildServiceSupport {
         camelContext.adapt(ExtendedCamelContext.class).removeRoute(route);
 
         // clear inputs on shutdown
-        inputs.clear();
+        input = null;
         warmUpDone.set(false);
         endpointDone.set(false);
     }