You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/22 08:47:58 UTC
[camel] branch master updated: CAMEL-15844: camel-core - A route
has only one input.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 4a7e50b CAMEL-15844: camel-core - A route has only one input.
4a7e50b is described below
commit 4a7e50be8095a1654fe7cc29393635683310bfec
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 22 09:25:44 2021 +0100
CAMEL-15844: camel-core - A route has only one input.
---
.../org/apache/camel/spi/RouteStartupOrder.java | 6 +-
.../impl/engine/DefaultRouteStartupOrder.java | 6 +-
.../camel/impl/engine/DefaultShutdownStrategy.java | 85 +++++++++++-----------
.../impl/engine/InternalRouteStartupManager.java | 11 ++-
.../org/apache/camel/impl/engine/RouteService.java | 16 ++--
5 files changed, 56 insertions(+), 68 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
index b5cb41d..c2b48b8 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
@@ -46,11 +46,11 @@ public interface RouteStartupOrder {
Route getRoute();
/**
- * Gets the input to this route (often only one consumer)
+ * Gets the input to this route
*
- * @return the input consumers.
+ * @return the consumer.
*/
- List<Consumer> getInputs();
+ Consumer getInput();
/**
* Gets the services to this route.
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
index 92b35d3..f027ccf 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRouteStartupOrder.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.camel.Consumer;
import org.apache.camel.Route;
@@ -51,9 +50,8 @@ public class DefaultRouteStartupOrder implements RouteStartupOrder {
}
@Override
- public List<Consumer> getInputs() {
- Map<Route, Consumer> inputs = routeService.getInputs();
- return new ArrayList<>(inputs.values());
+ public Consumer getInput() {
+ return routeService.getInput();
}
@Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
index b03e7b8..d0f82f2 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
@@ -375,9 +375,8 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
order.getRoute().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly);
}
- for (Consumer consumer : order.getInputs()) {
- shutdownNow(order.getRoute().getId(), consumer);
- }
+ // shutdown the route consumer
+ shutdownNow(order.getRoute().getId(), order.getInput());
}
}
@@ -574,53 +573,51 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
order.getRoute().getId(), shutdownRoute, shutdownRunningTask);
}
- for (Consumer consumer : order.getInputs()) {
-
- boolean suspend = false;
+ Consumer consumer = order.getInput();
+ boolean suspend = false;
- // assume we should shutdown if we are not deferred
- boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
+ // assume we should shutdown if we are not deferred
+ boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
- if (shutdown) {
- // if we are to shutdown then check whether we can suspend instead as its a more
- // gentle way to graceful shutdown
+ if (shutdown) {
+ // if we are to shutdown then check whether we can suspend instead as its a more
+ // gentle way to graceful shutdown
- // some consumers do not support shutting down so let them decide
- // if a consumer is suspendable then prefer to use that and then shutdown later
- if (consumer instanceof ShutdownAware) {
- shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
- }
- if (shutdown && consumer instanceof Suspendable) {
- // we prefer to suspend over shutdown
- suspend = true;
- }
+ // some consumers do not support shutting down so let them decide
+ // if a consumer is suspendable then prefer to use that and then shutdown later
+ if (consumer instanceof ShutdownAware) {
+ shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
}
-
- // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
- if (suspend) {
- // only suspend it and then later shutdown it
- suspendNow(order.getRoute().getId(), consumer);
- // add it to the deferred list so the route will be shutdown later
- deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
- // use basic endpoint uri to not log verbose details or potential sensitive data
- String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
- uri = URISupport.sanitizeUri(uri);
- LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(),
- uri);
- } else if (shutdown) {
- shutdownNow(order.getRoute().getId(), consumer);
- // use basic endpoint uri to not log verbose details or potential sensitive data
- String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
- uri = URISupport.sanitizeUri(uri);
- LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), uri);
- } else {
- // we will stop it later, but for now it must run to be able to help all inflight messages
- // be safely completed
- deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
- LOG.debug("Route: " + order.getRoute().getId()
- + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
+ if (shutdown && consumer instanceof Suspendable) {
+ // we prefer to suspend over shutdown
+ suspend = true;
}
}
+
+ // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
+ if (suspend) {
+ // only suspend it and then later shutdown it
+ suspendNow(order.getRoute().getId(), consumer);
+ // add it to the deferred list so the route will be shutdown later
+ deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
+ // use basic endpoint uri to not log verbose details or potential sensitive data
+ String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
+ uri = URISupport.sanitizeUri(uri);
+ LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(),
+ uri);
+ } else if (shutdown) {
+ shutdownNow(order.getRoute().getId(), consumer);
+ // use basic endpoint uri to not log verbose details or potential sensitive data
+ String uri = order.getRoute().getEndpoint().getEndpointBaseUri();
+ uri = URISupport.sanitizeUri(uri);
+ LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), uri);
+ } else {
+ // we will stop it later, but for now it must run to be able to help all inflight messages
+ // be safely completed
+ deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
+ LOG.debug("Route: " + order.getRoute().getId()
+ + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
+ }
}
// notify the services we intend to shutdown
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
index 82b15ba..fecd08b1 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
@@ -313,20 +313,19 @@ class InternalRouteStartupManager {
StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(),
"Starting Route");
- // start the service
- for (Consumer consumer : routeService.getInputs().values()) {
+ // do some preparation before starting the consumer on the route
+ Consumer consumer = routeService.getInput();
+ if (consumer != null) {
Endpoint endpoint = consumer.getEndpoint();
- // check multiple consumer violation, with the other routes to
- // be started
+ // check multiple consumer violation, with the other routes to be started
if (!doCheckMultipleConsumerSupportClash(endpoint, routeInputs)) {
throw new FailedToStartRouteException(
routeService.getId(), "Multiple consumers for the same endpoint is not allowed: " + endpoint);
}
// check for multiple consumer violations with existing routes
- // which
- // have already been started, or is currently starting
+ // which have already been started, or is currently starting
List<Endpoint> existingEndpoints = new ArrayList<>();
for (Route existingRoute : abstractCamelContext.getRoutes()) {
if (route.getId().equals(existingRoute.getId())) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
index d9453c1..c55c272 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
@@ -18,7 +18,6 @@ package org.apache.camel.impl.engine;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -57,7 +56,7 @@ public class RouteService extends ChildServiceSupport {
private final CamelContext camelContext;
private final Route route;
private boolean removingRoutes;
- private final Map<Route, Consumer> inputs = new HashMap<>();
+ private Consumer input;
private final AtomicBoolean warmUpDone = new AtomicBoolean();
private final AtomicBoolean endpointDone = new AtomicBoolean();
@@ -98,13 +97,8 @@ public class RouteService extends ChildServiceSupport {
return answer;
}
- /**
- * Gets the inputs to the routes.
- *
- * @return list of {@link Consumer} as inputs for the routes
- */
- public Map<Route, Consumer> getInputs() {
- return inputs;
+ public Consumer getInput() {
+ return input;
}
public boolean isRemovingRoutes() {
@@ -173,7 +167,7 @@ public class RouteService extends ChildServiceSupport {
}
if (service instanceof Consumer) {
- inputs.put(route, (Consumer) service);
+ this.input = (Consumer) service;
} else {
childServices.add(service);
}
@@ -301,7 +295,7 @@ public class RouteService extends ChildServiceSupport {
camelContext.adapt(ExtendedCamelContext.class).removeRoute(route);
// clear inputs on shutdown
- inputs.clear();
+ input = null;
warmUpDone.set(false);
endpointDone.set(false);
}