You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/23 11:12:21 UTC

[camel] branch master updated (8878b36 -> f29c681)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 8878b36  Regen catalog
     new b454f0e  CAMEL-15844: Polished
     new bb9d1a6  CAMEL-15844: camel-core - Optimize Route to move its setup (init) logic to the init phase of CamelContext, so they are initialized together.
     new 11a78f5  CAMEL-15844: camel-core - Optimize Route to move its setup (init) logic to the init phase of CamelContext, so they are initialized together.
     new 6d39724  CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.
     new cad5236  CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.
     new da7b017  Regen
     new 2f3aaf3  CAMEL-16020: camel-pulsar - Remove synchronous option.
     new f7b457a  CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.
     new f29c681  CAMEL-15844: Add javadoc about init/start logic not in constructors.

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/catalog/docs/pulsar-component.adoc       |   3 +-
 .../org/apache/camel/catalog/others.properties     |   1 +
 .../camel/catalog/others/flight-recorder.json      |   8 +-
 .../camel/component/ehcache/EhcacheConsumer.java   |  24 ++-
 .../guava/eventbus/GuavaEventBusEndpoint.java      |   4 +-
 .../component/ignite/AbstractIgniteEndpoint.java   |   7 +-
 .../cache/IgniteCacheContinuousQueryConsumer.java  |   5 +-
 .../ignite/cache/IgniteCacheEndpoint.java          |   4 +-
 .../ignite/IgniteCacheContinuousQueryTest.java     |  11 +-
 .../camel/component/ironmq/IronMQConsumer.java     |  11 +-
 .../camel/component/ironmq/IronMQEndpoint.java     |   2 +-
 .../jgroups/raft/JGroupsRaftConsumer.java          |   9 +-
 .../jgroups/raft/JGroupsRaftEndpoint.java          |   6 +-
 .../jgroups/raft/JGroupsRaftProducer.java          |  11 +-
 .../camel/component/jgroups/JGroupsConsumer.java   |   9 +-
 .../camel/component/jgroups/JGroupsEndpoint.java   |   4 +-
 .../camel/component/jgroups/JGroupsProducer.java   |   8 +-
 .../jira/consumer/WatchUpdatesConsumer.java        |   5 +
 .../org/apache/camel/component/milo/Messages.java  |   5 +-
 .../component/milo/client/MiloClientComponent.java |  56 +-----
 .../component/milo/client/MiloClientConsumer.java  |  49 +++--
 .../component/milo/client/MiloClientEndpoint.java  |  28 +--
 .../component/milo/client/MiloClientProducer.java  |  29 ++-
 .../component/milo/server/MiloServerConsumer.java  |  39 ++--
 .../component/milo/server/MiloServerEndpoint.java  |  12 +-
 .../component/milo/server/MiloServerProducer.java  |  17 +-
 .../mllp/MllpTcpServerConsumerTransactionTest.java |   2 +
 .../component/pulsar/PulsarEndpointConfigurer.java |   3 -
 .../component/pulsar/PulsarEndpointUriFactory.java |   3 +-
 .../org/apache/camel/component/pulsar/pulsar.json  |   3 +-
 .../src/main/docs/pulsar-component.adoc            |   3 +-
 .../camel/component/pulsar/PulsarEndpoint.java     |  11 --
 .../component/pulsar/PulsarMessageListener.java    |  12 +-
 .../pulsar/PulsarConsumerInAsynchronousTest.java   |   8 +-
 .../pulsar/PulsarConsumerInSynchronousTest.java    | 217 ---------------------
 .../apache/camel/component/ref/RefEndpoint.java    |  19 +-
 components/camel-stomp/README.md                   |  20 --
 .../camel/component/stomp/StompConsumer.java       |   2 +-
 .../camel/component/weather/WeatherConsumer.java   |  11 +-
 .../camel/component/weather/WeatherEndpoint.java   |   2 +-
 .../camel/component/web3j/Web3jConsumer.java       |  22 +--
 .../src/main/java/org/apache/camel/Consumer.java   |   6 +-
 .../src/main/java/org/apache/camel/Endpoint.java   |  16 +-
 .../java/org/apache/camel/PollingConsumer.java     |   4 +
 .../src/main/java/org/apache/camel/Producer.java   |   6 +-
 .../src/main/java/org/apache/camel/Route.java      |   5 +-
 .../camel/impl/engine/AbstractCamelContext.java    |  27 ++-
 .../org/apache/camel/impl/engine/DefaultRoute.java |  26 ++-
 .../impl/engine/InternalRouteStartupManager.java   |  37 +++-
 .../org/apache/camel/impl/engine/RouteService.java | 117 ++++++-----
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java |  97 ---------
 .../modules/ROOT/pages/pulsar-component.adoc       |   3 +-
 52 files changed, 375 insertions(+), 674 deletions(-)
 copy components/camel-jfr/src/generated/resources/jfr.json => catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/flight-recorder.json (55%)
 delete mode 100644 components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInSynchronousTest.java
 delete mode 100644 components/camel-stomp/README.md


[camel] 08/09: CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f7b457a959faf12049b1045c5db586877aff05d2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jan 23 11:55:44 2021 +0100

    CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.
---
 components/camel-stomp/README.md                   | 20 --------------------
 .../camel/component/stomp/StompConsumer.java       |  2 +-
 .../camel/component/weather/WeatherConsumer.java   | 11 ++++++++---
 .../camel/component/weather/WeatherEndpoint.java   |  2 +-
 .../camel/component/web3j/Web3jConsumer.java       | 22 ++++------------------
 5 files changed, 14 insertions(+), 43 deletions(-)

diff --git a/components/camel-stomp/README.md b/components/camel-stomp/README.md
deleted file mode 100644
index 8341d09..0000000
--- a/components/camel-stomp/README.md
+++ /dev/null
@@ -1,20 +0,0 @@
-camel-stomp
-===========
-
-Camel component used for communicating with [Stomp] (http://stomp.github.io/) compliant message brokers, like [Apache ActiveMQ](http://activemq.apache.org) or [ActiveMQ Apollo](http://activemq.apache.org/apollo/).
-
-URI format
-----------
-
-    stomp:destination
-
-Where destination is broker specific. With ActiveMQ you can use queues and topics in the form of
-
-    stomp:queue:test
-
-Samples
--------
-
-    from("direct:foo").to("stomp:queue:test")
-
-
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
index dc2ed63..ff28a15 100644
--- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
@@ -28,7 +28,6 @@ public class StompConsumer extends DefaultConsumer {
 
     public StompConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        id = getEndpoint().getNextId();
     }
 
     @Override
@@ -38,6 +37,7 @@ public class StompConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
+        id = getEndpoint().getNextId();
         getEndpoint().addConsumer(this);
         super.doStart();
     }
diff --git a/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherConsumer.java b/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherConsumer.java
index ec98e10..850f3da 100644
--- a/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherConsumer.java
+++ b/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherConsumer.java
@@ -33,11 +33,10 @@ public class WeatherConsumer extends ScheduledPollConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(WeatherConsumer.class);
 
-    private final String query;
+    private String query;
 
-    public WeatherConsumer(WeatherEndpoint endpoint, Processor processor, String query) {
+    public WeatherConsumer(WeatherEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.query = query;
     }
 
     @Override
@@ -46,6 +45,12 @@ public class WeatherConsumer extends ScheduledPollConsumer {
     }
 
     @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.query = this.getEndpoint().getWeatherQuery().getQuery();
+    }
+
+    @Override
     protected int poll() throws Exception {
         LOG.debug("Going to execute the Weather query {}", query);
         HttpClient httpClient = getEndpoint().getConfiguration().getHttpClient();
diff --git a/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherEndpoint.java b/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherEndpoint.java
index 24cdef7..70cf52d 100644
--- a/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherEndpoint.java
+++ b/components/camel-weather/src/main/java/org/apache/camel/component/weather/WeatherEndpoint.java
@@ -45,7 +45,7 @@ public class WeatherEndpoint extends DefaultPollingEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        WeatherConsumer answer = new WeatherConsumer(this, processor, getWeatherQuery().getQuery());
+        WeatherConsumer answer = new WeatherConsumer(this, processor);
 
         // ScheduledPollConsumer default delay is 500 millis and that is too often for polling a feed, so we override
         // with a new default value. End user can override this value by providing a consumer.delay parameter
diff --git a/components/camel-web3j/src/main/java/org/apache/camel/component/web3j/Web3jConsumer.java b/components/camel-web3j/src/main/java/org/apache/camel/component/web3j/Web3jConsumer.java
index fed6c12..1cbc8ff 100644
--- a/components/camel-web3j/src/main/java/org/apache/camel/component/web3j/Web3jConsumer.java
+++ b/components/camel-web3j/src/main/java/org/apache/camel/component/web3j/Web3jConsumer.java
@@ -35,14 +35,13 @@ import static org.apache.camel.component.web3j.Web3jHelper.toDefaultBlockParamet
  */
 public class Web3jConsumer extends DefaultConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(Web3jConsumer.class);
-    private final Web3j web3j;
     private final Web3jConfiguration configuration;
+    private Web3j web3j;
     private Subscription subscription;
     private Web3jEndpoint endpoint;
 
     public Web3jConsumer(Web3jEndpoint endpoint, Processor processor, Web3jConfiguration configuration) {
         super(endpoint, processor);
-        this.web3j = endpoint.getWeb3j();
         this.endpoint = endpoint;
         this.configuration = configuration;
     }
@@ -55,6 +54,9 @@ public class Web3jConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        this.web3j = getEndpoint().getWeb3j();
+
         LOG.info("Subscribing to: {}", endpoint.getNodeAddress());
         switch (configuration.getOperation()) {
             case Web3jConstants.ETH_LOG_OBSERVABLE:
@@ -163,22 +165,6 @@ public class Web3jConsumer extends DefaultConsumer {
         LOG.info("Subscribed: {}", this.configuration);
     }
 
-    private EthFilter buildEthFilter() {
-        EthFilter ethFilter = new EthFilter(
-                toDefaultBlockParameter(configuration.getFromBlock()), toDefaultBlockParameter(configuration.getToBlock()),
-                configuration.getAddresses());
-        if (configuration.getTopics() != null) {
-            for (String topic : configuration.getTopics()) {
-                if (topic != null && topic.length() > 0) {
-                    ethFilter.addSingleTopic(topic);
-                } else {
-                    ethFilter.addNullTopic();
-                }
-            }
-        }
-        return ethFilter;
-    }
-
     private void ethBlockHashObservable(String x) {
         LOG.debug("processEthBlock {}", x);
         Exchange exchange = this.getEndpoint().createExchange();


[camel] 04/09: CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6d39724f34f68da07ad4a705e45a189e3f32aeff
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 22 16:57:23 2021 +0100

    CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.
---
 .../camel/component/ehcache/EhcacheConsumer.java   | 24 +++++++++++++---------
 .../guava/eventbus/GuavaEventBusEndpoint.java      |  4 ++--
 2 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java
index b7551c5..7bf5f70 100644
--- a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java
+++ b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java
@@ -26,15 +26,24 @@ import org.ehcache.event.CacheEventListener;
 
 public class EhcacheConsumer extends DefaultConsumer implements CacheEventListener<Object, Object> {
     private final EhcacheConfiguration configuration;
-    private final EhcacheManager manager;
-    private final Cache cache;
+    private final String cacheName;
+    private Cache cache;
 
     public EhcacheConsumer(EhcacheEndpoint endpoint, String cacheName, EhcacheConfiguration configuration,
                            Processor processor) throws Exception {
         super(endpoint, processor);
-
         this.configuration = configuration;
-        this.manager = endpoint.getManager();
+        this.cacheName = cacheName;
+    }
+
+    @Override
+    public EhcacheEndpoint getEndpoint() {
+        return (EhcacheEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
 
         Class<?> kt = null;
         if (configuration.getKeyType() != null) {
@@ -44,12 +53,7 @@ public class EhcacheConsumer extends DefaultConsumer implements CacheEventListen
         if (configuration.getValueType() != null) {
             vt = getEndpoint().getCamelContext().getClassResolver().resolveClass(configuration.getValueType());
         }
-        this.cache = manager.getCache(cacheName, kt, vt);
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
+        this.cache = getEndpoint().getManager().getCache(cacheName, kt, vt);
 
         this.cache.getRuntimeConfiguration().registerCacheEventListener(
                 this,
diff --git a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
index f8b0619..d246eb6 100644
--- a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
+++ b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
@@ -125,8 +125,8 @@ public class GuavaEventBusEndpoint extends DefaultEndpoint implements MultipleCo
     }
 
     @Override
-    protected void doStart() throws Exception {
-        super.doStart();
+    protected void doInit() throws Exception {
+        super.doInit();
 
         if (eventBusRef != null && eventBus == null) {
             eventBus = CamelContextHelper.mandatoryLookup(getCamelContext(), eventBusRef, EventBus.class);


[camel] 07/09: CAMEL-16020: camel-pulsar - Remove synchronous option.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2f3aaf3562d470e45feed4311bb54db20414f7b0
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jan 23 11:02:16 2021 +0100

    CAMEL-16020: camel-pulsar - Remove synchronous option.
---
 .../camel/catalog/docs/pulsar-component.adoc       |   3 +-
 .../component/pulsar/PulsarEndpointConfigurer.java |   3 -
 .../component/pulsar/PulsarEndpointUriFactory.java |   3 +-
 .../org/apache/camel/component/pulsar/pulsar.json  |   3 +-
 .../src/main/docs/pulsar-component.adoc            |   3 +-
 .../camel/component/pulsar/PulsarEndpoint.java     |  11 --
 .../component/pulsar/PulsarMessageListener.java    |  12 +-
 .../pulsar/PulsarConsumerInAsynchronousTest.java   |   8 +-
 .../pulsar/PulsarConsumerInSynchronousTest.java    | 217 ---------------------
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java |  97 ---------
 .../modules/ROOT/pages/pulsar-component.adoc       |   3 +-
 11 files changed, 9 insertions(+), 354 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
index 5f5e433..e956ee8 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
@@ -108,7 +108,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (34 parameters):
+=== Query Parameters (33 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -147,7 +147,6 @@ with the following path and query parameters:
 | *messageRoutingMode* (producer) | Message Routing Mode to use. There are 3 enums and the value can be one of: SinglePartition, RoundRobinPartition, CustomPartition | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. |  | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds | 30000 | int
-| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 |===
 // endpoint options: END
 
diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 67874e3..8855cbc 100644
--- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -85,7 +85,6 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "subscriptionTopicsMode": target.getPulsarConfiguration().setSubscriptionTopicsMode(property(camelContext, org.apache.pulsar.client.api.RegexSubscriptionMode.class, value)); return true;
         case "subscriptiontype":
         case "subscriptionType": target.getPulsarConfiguration().setSubscriptionType(property(camelContext, org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.class, value)); return true;
-        case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
         case "topicspattern":
         case "topicsPattern": target.getPulsarConfiguration().setTopicsPattern(property(camelContext, boolean.class, value)); return true;
         default: return false;
@@ -159,7 +158,6 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "subscriptionTopicsMode": return org.apache.pulsar.client.api.RegexSubscriptionMode.class;
         case "subscriptiontype":
         case "subscriptionType": return org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.class;
-        case "synchronous": return boolean.class;
         case "topicspattern":
         case "topicsPattern": return boolean.class;
         default: return null;
@@ -234,7 +232,6 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "subscriptionTopicsMode": return target.getPulsarConfiguration().getSubscriptionTopicsMode();
         case "subscriptiontype":
         case "subscriptionType": return target.getPulsarConfiguration().getSubscriptionType();
-        case "synchronous": return target.isSynchronous();
         case "topicspattern":
         case "topicsPattern": return target.getPulsarConfiguration().isTopicsPattern();
         default: return null;
diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
index a71c126..611ad02 100644
--- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
+++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
@@ -20,10 +20,9 @@ public class PulsarEndpointUriFactory extends org.apache.camel.support.component
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(38);
+        Set<String> props = new HashSet<>(37);
         props.add("initialSequenceId");
         props.add("maxRedeliverCount");
-        props.add("synchronous");
         props.add("messageRouter");
         props.add("batchingMaxMessages");
         props.add("compressionType");
diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index 48d9691..ed1f7f6 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -96,7 +96,6 @@
     "messageRouter": { "kind": "parameter", "displayName": "Message Router", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRouter", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Custom Message Router to use" },
     "messageRoutingMode": { "kind": "parameter", "displayName": "Message Routing Mode", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRoutingMode", "enum": [ "SinglePartition", "RoundRobinPartition", "CustomPartition" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "RoundRobinPartition", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configuratio [...]
     "producerName": { "kind": "parameter", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." },
-    "sendTimeoutMs": { "kind": "parameter", "displayName": "Send Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Send timeout in milliseconds" },
-    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used" }
+    "sendTimeoutMs": { "kind": "parameter", "displayName": "Send Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Send timeout in milliseconds" }
   }
 }
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 5f5e433..e956ee8 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -108,7 +108,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (34 parameters):
+=== Query Parameters (33 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -147,7 +147,6 @@ with the following path and query parameters:
 | *messageRoutingMode* (producer) | Message Routing Mode to use. There are 3 enums and the value can be one of: SinglePartition, RoundRobinPartition, CustomPartition | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. |  | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds | 30000 | int
-| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 |===
 // endpoint options: END
 
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
index 8af7183..d89088e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
@@ -50,9 +50,6 @@ public class PulsarEndpoint extends DefaultEndpoint {
     @UriPath
     @Metadata(required = true)
     private String topic;
-    @UriParam(defaultValue = "false", label = "advanced",
-              description = "Sets whether synchronous processing should be strictly used")
-    private boolean synchronous;
 
     @UriParam
     private PulsarConfiguration pulsarConfiguration;
@@ -128,14 +125,6 @@ public class PulsarEndpoint extends DefaultEndpoint {
         this.topic = topic;
     }
 
-    public boolean isSynchronous() {
-        return synchronous;
-    }
-
-    public void setSynchronous(boolean synchronous) {
-        this.synchronous = synchronous;
-    }
-
     public PulsarConfiguration getPulsarConfiguration() {
         return pulsarConfiguration;
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index f17b13d..0607bda 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -49,22 +49,12 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
                         endpoint.getComponent().getPulsarMessageReceiptFactory()
                                 .newInstance(exchange, message, consumer));
             }
-            if (endpoint.isSynchronous()) {
-                process(exchange, consumer, message);
-            } else {
-                processAsync(exchange, consumer, message);
-            }
+            processAsync(exchange, consumer, message);
         } catch (Exception exception) {
             handleProcessorException(exchange, exception);
         }
     }
 
-    private void process(final Exchange exchange, final Consumer<byte[]> consumer, final Message<byte[]> message)
-            throws Exception {
-        pulsarConsumer.getProcessor().process(exchange);
-        acknowledge(consumer, message);
-    }
-
     private void processAsync(final Exchange exchange, final Consumer<byte[]> consumer, final Message<byte[]> message) {
         pulsarConsumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
             @Override
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInAsynchronousTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInAsynchronousTest.java
index 649fd34..7eb5886 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInAsynchronousTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInAsynchronousTest.java
@@ -55,18 +55,16 @@ public class PulsarConsumerInAsynchronousTest extends PulsarTestSupport {
     private static final String PRODUCER = "camel-producer-1";
 
     @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_FALSE + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&synchronous=false")
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint synchronousFalse;
 
     @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_FALSE_THROWS_EXCEPTION + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&synchronous=false")
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint synchronousFalseThrowsException;
 
     @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_FALSE_MANUAL_ACK + "?numberOfConsumers=1&subscriptionType=Exclusive"
                     + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&synchronous=false" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
+                    + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
     private Endpoint synchronousFalseManualAck;
 
     @EndpointInject("mock:result")
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInSynchronousTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInSynchronousTest.java
deleted file mode 100644
index f5ecf68..0000000
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInSynchronousTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.pulsar;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.pulsar.utils.AutoConfiguration;
-import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
-import org.apache.camel.spi.Registry;
-import org.apache.camel.support.SimpleRegistry;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.camel.test.junit5.TestSupport.body;
-
-public class PulsarConsumerInSynchronousTest extends PulsarTestSupport {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerInAsynchronousTest.class);
-
-    private static final String TOPIC_URI_SYNCHRONOUS_TRUE = "persistent://public/default/synchronousTrue";
-    private static final String TOPIC_URI_SYNCHRONOUS_DEFAULT = "persistent://public/default/synchronousDefault";
-
-    private static final String TOPIC_URI_SYNCHRONOUS_TRUE_THROWS_EXCEPTION
-            = "persistent://public/default/synchronousTrueThrowsException";
-
-    private static final String TOPIC_URI_SYNCHRONOUS_TRUE_MANUAL_ACK = "persistent://public/default/synchronousTrueManualAck";
-
-    private static final String PRODUCER = "camel-producer-1";
-
-    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_TRUE + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&synchronous=true")
-    private Endpoint synchronousTrue;
-
-    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_DEFAULT + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
-    private Endpoint synchronousDefault;
-
-    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_TRUE_THROWS_EXCEPTION + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&synchronous=true")
-    private Endpoint synchronousTrueThrowsException;
-
-    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_TRUE_MANUAL_ACK + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&synchronous=true" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
-    private Endpoint synchronousTrueManualAck;
-
-    @EndpointInject("mock:result")
-    private MockEndpoint to;
-
-    private CountDownLatch countDownLatch;
-
-    @Override
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-
-            Processor processor = new Processor() {
-                @Override
-                public void process(final Exchange exchange) throws InterruptedException {
-                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
-                    countDownLatch.countDown();
-                    countDownLatch.await(20, TimeUnit.SECONDS);
-                }
-            };
-
-            Processor manualAckProcessor = new Processor() {
-                @Override
-                public void process(final Exchange exchange) throws PulsarClientException {
-                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn().getHeader(
-                            PulsarMessageHeaders.MESSAGE_RECEIPT);
-                    receipt.acknowledge();
-                }
-            };
-
-            @Override
-            public void configure() {
-
-                from(synchronousTrue)
-                        .threads(2)
-                        .process(processor)
-                        .end()
-                        .to(to);
-
-                from(synchronousDefault)
-                        .threads(2)
-                        .process(processor)
-                        .end()
-                        .to(to);
-
-                from(synchronousTrueThrowsException)
-                        .threads(2)
-                        .throwException(new RuntimeException("Processor throws exception."))
-                        .end()
-                        .to(to);
-
-                from(synchronousTrueManualAck)
-                        .threads(2)
-                        .process(manualAckProcessor)
-                        .end()
-                        .to(to);
-            }
-        };
-    }
-
-    @Override
-    protected Registry createCamelRegistry() throws Exception {
-        SimpleRegistry registry = new SimpleRegistry();
-
-        registerPulsarBeans(registry);
-
-        return registry;
-    }
-
-    private void registerPulsarBeans(SimpleRegistry registry) throws PulsarClientException {
-        PulsarClient pulsarClient = givenPulsarClient();
-        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
-
-        registry.bind("pulsarClient", pulsarClient);
-        PulsarComponent comp = new PulsarComponent(context);
-        comp.setAutoConfiguration(autoConfiguration);
-        comp.setPulsarClient(pulsarClient);
-        registry.bind("pulsar", comp);
-
-    }
-
-    private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
-    }
-
-    @Test
-    public void testMessagesProcessedSynchronously() throws Exception {
-        processSynchronously(TOPIC_URI_SYNCHRONOUS_TRUE);
-    }
-
-    @Test
-    public void testMessagesProcessedSynchronouslyByDefault() throws Exception {
-        processSynchronously(TOPIC_URI_SYNCHRONOUS_DEFAULT);
-    }
-
-    public void processSynchronously(String topic) throws Exception {
-
-        to.expectedMessageCount(2);
-
-        countDownLatch = new CountDownLatch(2);
-
-        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
-                .topic(topic).create();
-
-        producer.send("One");
-        producer.send("Two");
-
-        to.assertIsNotSatisfied(2000L); // ms
-
-    }
-
-    @Test
-    public void testMessageProcessedSynchronouslyThrowsException() throws Exception {
-        throwsException(TOPIC_URI_SYNCHRONOUS_TRUE_THROWS_EXCEPTION);
-    }
-
-    public void throwsException(String topic) throws Exception {
-        to.expectedMessageCount(0);
-        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
-                .topic(topic).create();
-
-        producer.send("One");
-
-        MockEndpoint.assertIsSatisfied(2, TimeUnit.SECONDS, to);
-    }
-
-    @Test
-    public void testMessagesProcessedSynchronouslyManualAcknowledge() throws Exception {
-        manualAcknowledgement(TOPIC_URI_SYNCHRONOUS_TRUE_MANUAL_ACK);
-    }
-
-    public void manualAcknowledgement(String topic) throws Exception {
-        to.expectsNoDuplicates(body());
-        to.expectedMessageCount(1);
-
-        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
-                .topic(topic).create();
-
-        producer.send("Hello World!");
-
-        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
-    }
-
-}
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 41ac7db..f072bfe 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -650,39 +650,6 @@ public interface PulsarEndpointBuilderFactory {
             doSetProperty("exchangePattern", exchangePattern);
             return this;
         }
-        /**
-         * Sets whether synchronous processing should be strictly used.
-         * 
-         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
-         * 
-         * Default: false
-         * Group: advanced
-         * 
-         * @param synchronous the value to set
-         * @return the dsl builder
-         */
-        default AdvancedPulsarEndpointConsumerBuilder synchronous(
-                boolean synchronous) {
-            doSetProperty("synchronous", synchronous);
-            return this;
-        }
-        /**
-         * Sets whether synchronous processing should be strictly used.
-         * 
-         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
-         * type.
-         * 
-         * Default: false
-         * Group: advanced
-         * 
-         * @param synchronous the value to set
-         * @return the dsl builder
-         */
-        default AdvancedPulsarEndpointConsumerBuilder synchronous(
-                String synchronous) {
-            doSetProperty("synchronous", synchronous);
-            return this;
-        }
     }
 
     /**
@@ -1179,39 +1146,6 @@ public interface PulsarEndpointBuilderFactory {
         default PulsarEndpointProducerBuilder basic() {
             return (PulsarEndpointProducerBuilder) this;
         }
-        /**
-         * Sets whether synchronous processing should be strictly used.
-         * 
-         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
-         * 
-         * Default: false
-         * Group: advanced
-         * 
-         * @param synchronous the value to set
-         * @return the dsl builder
-         */
-        default AdvancedPulsarEndpointProducerBuilder synchronous(
-                boolean synchronous) {
-            doSetProperty("synchronous", synchronous);
-            return this;
-        }
-        /**
-         * Sets whether synchronous processing should be strictly used.
-         * 
-         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
-         * type.
-         * 
-         * Default: false
-         * Group: advanced
-         * 
-         * @param synchronous the value to set
-         * @return the dsl builder
-         */
-        default AdvancedPulsarEndpointProducerBuilder synchronous(
-                String synchronous) {
-            doSetProperty("synchronous", synchronous);
-            return this;
-        }
     }
 
     /**
@@ -1236,37 +1170,6 @@ public interface PulsarEndpointBuilderFactory {
         default PulsarEndpointBuilder basic() {
             return (PulsarEndpointBuilder) this;
         }
-        /**
-         * Sets whether synchronous processing should be strictly used.
-         * 
-         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
-         * 
-         * Default: false
-         * Group: advanced
-         * 
-         * @param synchronous the value to set
-         * @return the dsl builder
-         */
-        default AdvancedPulsarEndpointBuilder synchronous(boolean synchronous) {
-            doSetProperty("synchronous", synchronous);
-            return this;
-        }
-        /**
-         * Sets whether synchronous processing should be strictly used.
-         * 
-         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
-         * type.
-         * 
-         * Default: false
-         * Group: advanced
-         * 
-         * @param synchronous the value to set
-         * @return the dsl builder
-         */
-        default AdvancedPulsarEndpointBuilder synchronous(String synchronous) {
-            doSetProperty("synchronous", synchronous);
-            return this;
-        }
     }
 
     /**
diff --git a/docs/components/modules/ROOT/pages/pulsar-component.adoc b/docs/components/modules/ROOT/pages/pulsar-component.adoc
index 75506aa..aff7b33 100644
--- a/docs/components/modules/ROOT/pages/pulsar-component.adoc
+++ b/docs/components/modules/ROOT/pages/pulsar-component.adoc
@@ -110,7 +110,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (34 parameters):
+=== Query Parameters (33 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -149,7 +149,6 @@ with the following path and query parameters:
 | *messageRoutingMode* (producer) | Message Routing Mode to use. There are 3 enums and the value can be one of: SinglePartition, RoundRobinPartition, CustomPartition | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. |  | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds | 30000 | int
-| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 |===
 // endpoint options: END
 


[camel] 06/09: Regen

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit da7b01795f89d7f28af321730bbdb4d9dc85f60b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jan 23 10:36:52 2021 +0100

    Regen
---
 .../resources/org/apache/camel/catalog/others.properties  |  1 +
 .../org/apache/camel/catalog/others/flight-recorder.json  | 15 +++++++++++++++
 2 files changed, 16 insertions(+)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
index 6c64234..47e9091 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
@@ -5,6 +5,7 @@ csimple-joor
 cxf-transport
 elytron
 etcd3
+flight-recorder
 headersmap
 hystrix
 jasypt
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/flight-recorder.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/flight-recorder.json
new file mode 100644
index 0000000..8552d44
--- /dev/null
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/flight-recorder.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "flight-recorder",
+    "title": "Java Flight Recorder",
+    "description": "Diagnose Camel with Java Flight Recorder",
+    "deprecated": false,
+    "firstVersion": "3.8.0",
+    "label": "monitoring",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-flight-recorder",
+    "version": "3.8.0-SNAPSHOT"
+  }
+}


[camel] 01/09: CAMEL-15844: Polished

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b454f0e20015ab8b26de24e0a6d8dec6c5284bf8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 22 10:02:32 2021 +0100

    CAMEL-15844: Polished
---
 .../main/java/org/apache/camel/impl/engine/AbstractCamelContext.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 43068ae..3971cc5 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2708,8 +2708,9 @@ public abstract class AbstractCamelContext extends BaseService
             }
         }
 
-        // start the route definitions before the routes is started
+        // init the route definitions before the routes is started
         StartupStep subStep = startupStepRecorder.beginStep(CamelContext.class, getName(), "Initializing routes");
+        // the method is called start but at this point it will only initialize (as context is starting up)
         startRouteDefinitions();
         startupStepRecorder.endStep(subStep);
 


[camel] 03/09: CAMEL-15844: camel-core - Optimize Route to move its setup (init) logic to the init phase of CamelContext, so they are initialized together.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 11a78f573e3de939d417ade4f701a9639296ec51
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 22 13:52:31 2021 +0100

    CAMEL-15844: camel-core - Optimize Route to move its setup (init) logic to the init phase of CamelContext, so they are initialized together.
---
 .../camel/impl/engine/AbstractCamelContext.java    | 22 ++++++++++++++--------
 .../org/apache/camel/impl/engine/RouteService.java | 12 +++++-------
 2 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 036e98b..2549d74 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -314,8 +314,8 @@ public abstract class AbstractCamelContext extends BaseService
     private ShutdownRoute shutdownRoute = ShutdownRoute.Default;
     private ShutdownRunningTask shutdownRunningTask = ShutdownRunningTask.CompleteCurrentTaskOnly;
     private Debugger debugger;
+    private long initTaken;
     private long startDate;
-    private long bootDate;
 
     private SSLContextParameters sslContextParameters;
 
@@ -2535,8 +2535,6 @@ public abstract class AbstractCamelContext extends BaseService
 
     @Override
     public void doBuild() throws Exception {
-        bootDate = System.currentTimeMillis();
-
         // auto-detect step recorder from classpath if none has been explicit configured
         if (startupStepRecorder.getClass().getSimpleName().equals("DefaultStartupStepRecorder")) {
             StartupStepRecorder fr = getBootstrapFactoryFinder()
@@ -2593,6 +2591,8 @@ public abstract class AbstractCamelContext extends BaseService
 
     @Override
     public void doInit() throws Exception {
+        StopWatch watch = new StopWatch();
+
         StartupStep step = startupStepRecorder.beginStep(CamelContext.class, null, "Initializing CamelContext");
 
         // init the route controller
@@ -2738,6 +2738,9 @@ public abstract class AbstractCamelContext extends BaseService
         EventHelper.notifyCamelContextInitialized(this);
 
         startupStepRecorder.endStep(step);
+
+        initTaken = watch.taken();
+        LOG.info("Apache Camel {} ({}) initialized in {}", getVersion(), getName(), TimeUtils.printDuration(initTaken));
     }
 
     @Override
@@ -2765,7 +2768,9 @@ public abstract class AbstractCamelContext extends BaseService
     }
 
     protected void doStartContext() throws Exception {
-        LOG.info("Apache Camel {} ({}) is starting", getVersion(), getName());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Apache Camel {} ({}) is starting", getVersion(), getName());
+        }
         vetoed = null;
         startDate = System.currentTimeMillis();
         stopWatch.restart();
@@ -2874,9 +2879,11 @@ public abstract class AbstractCamelContext extends BaseService
             }
         }
 
-        String start = TimeUtils.printDuration(stopWatch.taken());
-        String boot = TimeUtils.printDuration(new StopWatch(bootDate).taken());
-        LOG.info("Apache Camel {} ({}) started in {} (incl boot {})", getVersion(), getName(), start, boot);
+        long taken = stopWatch.taken();
+        long total = initTaken + taken;
+        String start = TimeUtils.printDuration(taken);
+        String boot = TimeUtils.printDuration(total);
+        LOG.info("Apache Camel {} ({}) started in {} (with init {})", getVersion(), getName(), start, boot);
     }
 
     protected void doStartCamel() throws Exception {
@@ -3176,7 +3183,6 @@ public abstract class AbstractCamelContext extends BaseService
 
         // and clear start date
         startDate = 0;
-        bootDate = 0;
 
         // Call all registered trackers with this context
         // Note, this may use a partially constructed object
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
index 2b9fc9f..2bf2261 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
@@ -168,7 +168,7 @@ public class RouteService extends ChildServiceSupport {
                     list.add(service);
                 }
             }
-            initChildServices(route, list);
+            initChildServices(list);
         }
     }
 
@@ -249,8 +249,7 @@ public class RouteService extends ChildServiceSupport {
         }
 
         try (MDCHelper mdcHelper = new MDCHelper(route.getId())) {
-            // TODO: childrenService + some more
-            // gather list of services to stop as we need to start child services as well
+            // gather list of services to stop
             Set<Service> services = gatherChildServices();
 
             // stop services
@@ -285,8 +284,7 @@ public class RouteService extends ChildServiceSupport {
     @Override
     protected void doShutdown() {
         try (MDCHelper mdcHelper = new MDCHelper(route.getId())) {
-            // TODO: childrenService + some more
-            // gather list of services to stop as we need to start child services as well
+            // gather list of services to shutdown
             Set<Service> services = gatherChildServices();
 
             // shutdown services
@@ -351,9 +349,10 @@ public class RouteService extends ChildServiceSupport {
         }
     }
 
-    protected void initChildServices(Route route, List<Service> services) {
+    protected void initChildServices(List<Service> services) {
         for (Service service : services) {
             ServiceHelper.initService(service);
+            // add and remember as child service
             addChildService(service);
         }
     }
@@ -385,7 +384,6 @@ public class RouteService extends ChildServiceSupport {
      * Gather all child services
      */
     private Set<Service> gatherChildServices() {
-        // gather list of services to stop as we need to start child services as well
         List<Service> services = new ArrayList<>(route.getServices());
         // also get route scoped services
         doGetRouteServices(services);


[camel] 09/09: CAMEL-15844: Add javadoc about init/start logic not in constructors.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f29c681eb9dda7c6beaf934fd040c9381a36aeaf
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jan 23 11:56:26 2021 +0100

    CAMEL-15844: Add javadoc about init/start logic not in constructors.
---
 .../src/main/java/org/apache/camel/Consumer.java         |  6 +++++-
 .../src/main/java/org/apache/camel/Endpoint.java         | 16 +++++++++++++++-
 .../src/main/java/org/apache/camel/PollingConsumer.java  |  4 ++++
 .../src/main/java/org/apache/camel/Producer.java         |  6 +++++-
 4 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
index 3d08aca..bd368e3e 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -17,7 +17,11 @@
 package org.apache.camel;
 
 /**
- * A consumer of message exchanges from an {@link Endpoint}
+ * A consumer of message exchanges from an {@link Endpoint}.
+ * <p/>
+ * Important: Do not do any initialization in the constructor.
+ * Instead use {@link org.apache.camel.support.service.ServiceSupport#doInit()} or
+ * {@link org.apache.camel.support.service.ServiceSupport#doStart()}.
  */
 public interface Consumer extends Service, EndpointAware {
 
diff --git a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
index 1ffc371..39be552 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel;
 
 import java.util.Map;
 
+import org.apache.camel.support.service.ServiceSupport;
+
 /**
  * An <a href="http://camel.apache.org/endpoint.html">endpoint</a> implements the
  * <a href="http://camel.apache.org/message-endpoint.html">Message Endpoint</a> pattern and represents an endpoint that
@@ -84,7 +86,10 @@ public interface Endpoint extends IsSingleton, Service {
     CamelContext getCamelContext();
 
     /**
-     * Creates a new producer which is used send messages into the endpoint
+     * Creates a new producer which is used send messages into the endpoint.
+     *
+     * Important: Do not do any initialization in the constructor of the {@link Producer}.
+     * Instead use {@link ServiceSupport#doInit()} or {@link ServiceSupport#doStart()}.
      *
      * @return           a newly created producer
      * @throws Exception can be thrown
@@ -101,6 +106,9 @@ public interface Endpoint extends IsSingleton, Service {
     /**
      * Creates a new producer which is used send messages into the endpoint
      *
+     * Important: Do not do any initialization in the constructor of the {@link Producer}.
+     * Instead use {@link ServiceSupport#doInit()} or {@link ServiceSupport#doStart()}.
+     *
      * @return           a newly created producer
      * @throws Exception can be thrown
      */
@@ -110,6 +118,9 @@ public interface Endpoint extends IsSingleton, Service {
      * Creates a new <a href="http://camel.apache.org/event-driven-consumer.html">Event Driven Consumer</a> which
      * consumes messages from the endpoint using the given processor
      *
+     * Important: Do not do any initialization in the constructor of the {@link Consumer}.
+     * Instead use {@link ServiceSupport#doInit()} or {@link ServiceSupport#doStart()}.
+     *
      * @param  processor the given processor
      * @return           a newly created consumer
      * @throws Exception can be thrown
@@ -123,6 +134,9 @@ public interface Endpoint extends IsSingleton, Service {
      * rather than using the <a href="http://camel.apache.org/event-driven-consumer.html">Event Based Consumer</a>
      * returned by {@link #createConsumer(Processor)}
      *
+     * Important: Do not do any initialization in the constructor of the {@link PollingConsumer}.
+     * Instead use {@link ServiceSupport#doInit()} or {@link ServiceSupport#doStart()}.
+     *
      * @return           a newly created pull consumer
      * @throws Exception if the pull consumer could not be created
      */
diff --git a/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java b/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java
index d9d2dc1..cf68364 100644
--- a/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java
@@ -25,6 +25,10 @@ package org.apache.camel;
  * <p/>
  * This is needed to ensure any {@link org.apache.camel.spi.Synchronization} works is being executed. For example if you
  * consumed from a file endpoint, then the consumed file is only moved/delete when you done the {@link Exchange}.
+ * <p/>
+ * Important: Do not do any initialization in the constructor.
+ * Instead use {@link org.apache.camel.support.service.ServiceSupport#doInit()} or
+ * {@link org.apache.camel.support.service.ServiceSupport#doStart()}.
  */
 public interface PollingConsumer extends Consumer {
 
diff --git a/core/camel-api/src/main/java/org/apache/camel/Producer.java b/core/camel-api/src/main/java/org/apache/camel/Producer.java
index 1bfb4ae..e9c7605 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Producer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Producer.java
@@ -17,7 +17,11 @@
 package org.apache.camel;
 
 /**
- * Provides a channel on which clients can create and invoke message exchanges on an {@link Endpoint}
+ * Provides a channel on which clients can create and invoke message exchanges on an {@link Endpoint}.
+ * <p/>
+ * Important: Do not do any initialization in the constructor.
+ * Instead use {@link org.apache.camel.support.service.ServiceSupport#doInit()} or
+ * {@link org.apache.camel.support.service.ServiceSupport#doStart()}.
  */
 public interface Producer extends Processor, Service, IsSingleton, EndpointAware {
 


[camel] 02/09: CAMEL-15844: camel-core - Optimize Route to move its setup (init) logic to the init phase of CamelContext, so they are initialized together.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bb9d1a6fb2b86670ecdae7c23ad2128113b71319
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 22 13:29:31 2021 +0100

    CAMEL-15844: camel-core - Optimize Route to move its setup (init) logic to the init phase of CamelContext, so they are initialized together.
---
 .../apache/camel/component/ref/RefEndpoint.java    |  19 ++--
 .../src/main/java/org/apache/camel/Route.java      |   5 +-
 .../camel/impl/engine/AbstractCamelContext.java    |   2 +
 .../org/apache/camel/impl/engine/DefaultRoute.java |  26 ++++-
 .../impl/engine/InternalRouteStartupManager.java   |  37 ++++++-
 .../org/apache/camel/impl/engine/RouteService.java | 113 +++++++++++++--------
 6 files changed, 140 insertions(+), 62 deletions(-)

diff --git a/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java b/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java
index 912d65d..4a8645d 100644
--- a/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java
+++ b/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java
@@ -69,22 +69,17 @@ public class RefEndpoint extends DefaultEndpoint implements DelegateEndpoint {
 
     @Override
     public Endpoint getEndpoint() {
-        if (endpoint == null) {
-            endpoint = CamelContextHelper.mandatoryLookup(getCamelContext(), name, Endpoint.class);
-        }
         return endpoint;
     }
 
     @Override
-    protected void doStart() throws Exception {
-        // add the endpoint to the endpoint registry
-        getCamelContext().addEndpoint(getEndpoint().getEndpointUri(), getEndpoint());
-        super.doStart();
+    protected void doInit() throws Exception {
+        if (endpoint == null) {
+            // endpoint is mandatory
+            endpoint = CamelContextHelper.mandatoryLookup(getCamelContext(), name, Endpoint.class);
+            getCamelContext().addEndpoint(getEndpoint().getEndpointUri(), endpoint);
+        }
+        super.doInit();
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        // noop
-    }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java
index 9f14a1c..808dc35 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -130,12 +130,11 @@ public interface Route extends RuntimeConfiguration {
     Endpoint getEndpoint();
 
     /**
-     * A strategy callback allowing special initialization when services are starting.
+     * A strategy callback allowing special initialization when services are initializing.
      *
-     * @param  services  the service
      * @throws Exception is thrown in case of error
      */
-    void onStartingServices(List<Service> services) throws Exception;
+    void initializeServices() throws Exception;
 
     /**
      * Returns the services for this particular route
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 3971cc5..036e98b 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2712,6 +2712,8 @@ public abstract class AbstractCamelContext extends BaseService
         StartupStep subStep = startupStepRecorder.beginStep(CamelContext.class, getName(), "Initializing routes");
         // the method is called start but at this point it will only initialize (as context is starting up)
         startRouteDefinitions();
+        // this will init route definitions and populate as route services which we can then initialize now
+        internalRouteStartupManager.doInitRoutes(routeServices);
         startupStepRecorder.endStep(subStep);
 
         if (!lifecycleStrategies.isEmpty()) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 9d55f0f..fd877b0 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -48,6 +48,7 @@ import org.apache.camel.spi.RouteError;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.TimeUtils;
@@ -156,8 +157,9 @@ public class DefaultRoute extends ServiceSupport implements Route {
     }
 
     @Override
-    public void onStartingServices(List<Service> services) throws Exception {
-        addServices(services);
+    public void initializeServices() throws Exception {
+        // gather all the services for this route
+        gatherServices(services);
     }
 
     @Override
@@ -174,7 +176,7 @@ public class DefaultRoute extends ServiceSupport implements Route {
 
     @Override
     public void warmUp() {
-        getServices().clear();
+        // noop
     }
 
     /**
@@ -570,7 +572,23 @@ public class DefaultRoute extends ServiceSupport implements Route {
      * Factory method to lazily create the complete list of services required for this route such as adding the
      * processor or consumer
      */
-    protected void addServices(List<Service> services) throws Exception {
+    protected void gatherServices(List<Service> services) throws Exception {
+        // first gather the root services
+        gatherRootServices(services);
+        // and then all the child services
+        List<Service> children = new ArrayList<>();
+        for (Service service : services) {
+            Set<Service> extra = ServiceHelper.getChildServices(service);
+            children.addAll(extra);
+        }
+        for (Service extra : children) {
+            if (!services.contains(extra)) {
+                services.add(extra);
+            }
+        }
+    }
+
+    protected void gatherRootServices(List<Service> services) throws Exception {
         Endpoint endpoint = getEndpoint();
         consumer = endpoint.createConsumer(processor);
         if (consumer != null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
index fecd08b1..f3dbf02 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Internal route startup manager used by {@link AbstractCamelContext} to safely start internal route services during
  * starting routes.
- *
+ * <p>
  * This code has been refactored out of {@link AbstractCamelContext} to its own class.
  */
 class InternalRouteStartupManager {
@@ -70,6 +70,37 @@ class InternalRouteStartupManager {
     }
 
     /**
+     * Initializes the routes
+     *
+     * @param  routeServices the routes to initialize
+     * @throws Exception     is thrown if error initializing routes
+     */
+    protected void doInitRoutes(Map<String, RouteService> routeServices)
+            throws Exception {
+
+        abstractCamelContext.setStartingRoutes(true);
+        try {
+            for (RouteService routeService : routeServices.values()) {
+                StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(),
+                        "Initializing Route");
+                try {
+                    LOG.debug("Initializing route id: {}", routeService.getId());
+                    setupRoute.set(routeService.getRoute());
+                    // initializing route is called doSetup as we do not want to change the service state on the RouteService
+                    // so it can remain as stopped, when Camel is booting as this was the previous behavior - otherwise its state
+                    // would be initialized
+                    routeService.setUp();
+                } finally {
+                    setupRoute.remove();
+                    abstractCamelContext.getStartupStepRecorder().endStep(step);
+                }
+            }
+        } finally {
+            abstractCamelContext.setStartingRoutes(false);
+        }
+    }
+
+    /**
      * Starts or resumes the routes
      *
      * @param  routeServices  the routes to start (will only start a route if its not already started)
@@ -200,7 +231,7 @@ class InternalRouteStartupManager {
     }
 
     /**
-     * @see #safelyStartRouteServices(boolean,boolean,boolean,boolean,Collection)
+     * @see #safelyStartRouteServices(boolean, boolean, boolean, boolean, Collection)
      */
     protected synchronized void safelyStartRouteServices(
             boolean forceAutoStart, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes,
@@ -270,6 +301,8 @@ class InternalRouteStartupManager {
             try {
                 LOG.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup);
                 setupRoute.set(routeService.getRoute());
+                // ensure we setup before warmup
+                routeService.setUp();
                 routeService.warmUp();
             } finally {
                 setupRoute.remove();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
index c55c272..2b9fc9f 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
@@ -57,6 +57,7 @@ public class RouteService extends ChildServiceSupport {
     private final Route route;
     private boolean removingRoutes;
     private Consumer input;
+    private final AtomicBoolean setUpDone = new AtomicBoolean();
     private final AtomicBoolean warmUpDone = new AtomicBoolean();
     private final AtomicBoolean endpointDone = new AtomicBoolean();
 
@@ -117,6 +118,16 @@ public class RouteService extends ChildServiceSupport {
         }
     }
 
+    public void setUp() throws FailedToStartRouteException {
+        if (setUpDone.compareAndSet(false, true)) {
+            try {
+                doSetup();
+            } catch (Exception e) {
+                throw new FailedToStartRouteException(getId(), route.getDescription(), e);
+            }
+        }
+    }
+
     public boolean isAutoStartup() {
         if (!getCamelContext().isAutoStartup()) {
             return false;
@@ -124,6 +135,43 @@ public class RouteService extends ChildServiceSupport {
         return getRoute().isAutoStartup();
     }
 
+    protected synchronized void doSetup() throws Exception {
+        // to setup we initialize the services
+        ServiceHelper.initService(route.getEndpoint());
+
+        try (MDCHelper mdcHelper = new MDCHelper(route.getId())) {
+
+            // ensure services are initialized first
+            route.initializeServices();
+            List<Service> services = route.getServices();
+
+            // split into consumers and child services as we need to start the consumers
+            // afterwards to avoid them being active while the others start
+            List<Service> list = new ArrayList<>();
+            for (Service service : services) {
+
+                // inject the route
+                if (service instanceof RouteAware) {
+                    ((RouteAware) service).setRoute(route);
+                }
+                if (service instanceof RouteIdAware) {
+                    ((RouteIdAware) service).setRouteId(route.getId());
+                }
+                // inject camel context
+                if (service instanceof CamelContextAware) {
+                    ((CamelContextAware) service).setCamelContext(camelContext);
+                }
+
+                if (service instanceof Consumer) {
+                    this.input = (Consumer) service;
+                } else {
+                    list.add(service);
+                }
+            }
+            initChildServices(route, list);
+        }
+    }
+
     protected synchronized void doWarmUp() throws Exception {
         if (endpointDone.compareAndSet(false, true)) {
             // endpoints should only be started once as they can be reused on other routes
@@ -138,41 +186,7 @@ public class RouteService extends ChildServiceSupport {
                 // warm up the route first
                 route.warmUp();
 
-                List<Service> services = route.getServices();
-
-                // callback that we are staring these services
-                route.onStartingServices(services);
-
-                // gather list of services to start as we need to start child services as well
-                Set<Service> list = new LinkedHashSet<>();
-                for (Service service : services) {
-                    list.addAll(ServiceHelper.getChildServices(service));
-                }
-
-                // split into consumers and child services as we need to start the consumers
-                // afterwards to avoid them being active while the others start
-                List<Service> childServices = new ArrayList<>();
-                for (Service service : list) {
-
-                    // inject the route
-                    if (service instanceof RouteAware) {
-                        ((RouteAware) service).setRoute(route);
-                    }
-                    if (service instanceof RouteIdAware) {
-                        ((RouteIdAware) service).setRouteId(route.getId());
-                    }
-                    // inject camel context
-                    if (service instanceof CamelContextAware) {
-                        ((CamelContextAware) service).setCamelContext(camelContext);
-                    }
-
-                    if (service instanceof Consumer) {
-                        this.input = (Consumer) service;
-                    } else {
-                        childServices.add(service);
-                    }
-                }
-                startChildService(route, childServices);
+                startChildServices(route, childServices);
 
                 // fire event
                 EventHelper.notifyRouteAdded(camelContext, route);
@@ -199,6 +213,7 @@ public class RouteService extends ChildServiceSupport {
         }
 
         try {
+            // ensure we are warmed up
             warmUp();
         } catch (FailedToStartRouteException e) {
             throw RuntimeCamelException.wrapRuntimeException(e);
@@ -234,11 +249,12 @@ public class RouteService extends ChildServiceSupport {
         }
 
         try (MDCHelper mdcHelper = new MDCHelper(route.getId())) {
+            // TODO: childrenService + some more
             // gather list of services to stop as we need to start child services as well
             Set<Service> services = gatherChildServices();
 
             // stop services
-            stopChildService(route, services, isShutdownCamelContext);
+            stopChildServices(route, services, isShutdownCamelContext);
 
             // stop the route itself
             if (isShutdownCamelContext) {
@@ -256,18 +272,25 @@ public class RouteService extends ChildServiceSupport {
         if (isRemovingRoutes()) {
             camelContext.adapt(ExtendedCamelContext.class).removeRoute(route);
         }
-        // need to warm up again
+        // need to redo if we start again after being stopped
+        input = null;
+        childServices = null;
+        warmUpDone.set(false);
+        setUpDone.set(false);
+        endpointDone.set(false);
+        setUpDone.set(false);
         warmUpDone.set(false);
     }
 
     @Override
     protected void doShutdown() {
         try (MDCHelper mdcHelper = new MDCHelper(route.getId())) {
+            // TODO: childrenService + some more
             // gather list of services to stop as we need to start child services as well
             Set<Service> services = gatherChildServices();
 
             // shutdown services
-            stopChildService(route, services, true);
+            stopChildServices(route, services, true);
 
             // shutdown the route itself
             ServiceHelper.stopAndShutdownServices(route);
@@ -296,7 +319,9 @@ public class RouteService extends ChildServiceSupport {
 
         // clear inputs on shutdown
         input = null;
+        childServices = null;
         warmUpDone.set(false);
+        setUpDone.set(false);
         endpointDone.set(false);
     }
 
@@ -326,17 +351,23 @@ public class RouteService extends ChildServiceSupport {
         }
     }
 
-    protected void startChildService(Route route, List<Service> services) {
+    protected void initChildServices(Route route, List<Service> services) {
+        for (Service service : services) {
+            ServiceHelper.initService(service);
+            addChildService(service);
+        }
+    }
+
+    protected void startChildServices(Route route, List<Service> services) {
         for (Service service : services) {
             for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
                 strategy.onServiceAdd(camelContext, service, route);
             }
             ServiceHelper.startService(service);
-            addChildService(service);
         }
     }
 
-    protected void stopChildService(Route route, Set<Service> services, boolean shutdown) {
+    protected void stopChildServices(Route route, Set<Service> services, boolean shutdown) {
         for (Service service : services) {
             for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
                 strategy.onServiceRemove(camelContext, service, route);


[camel] 05/09: CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cad5236888f7fd9b8f5fc81022d2acc436bab10d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jan 23 10:36:33 2021 +0100

    CAMEL-15844: Camel components creating consumer should not do init/start logic in their constructors.
---
 .../component/ignite/AbstractIgniteEndpoint.java   |  7 +--
 .../cache/IgniteCacheContinuousQueryConsumer.java  |  5 +-
 .../ignite/cache/IgniteCacheEndpoint.java          |  4 +-
 .../ignite/IgniteCacheContinuousQueryTest.java     | 11 +++--
 .../camel/component/ironmq/IronMQConsumer.java     | 11 +++--
 .../camel/component/ironmq/IronMQEndpoint.java     |  2 +-
 .../jgroups/raft/JGroupsRaftConsumer.java          |  9 ++--
 .../jgroups/raft/JGroupsRaftEndpoint.java          |  6 ++-
 .../jgroups/raft/JGroupsRaftProducer.java          | 11 ++---
 .../camel/component/jgroups/JGroupsConsumer.java   |  9 ++--
 .../camel/component/jgroups/JGroupsEndpoint.java   |  4 +-
 .../camel/component/jgroups/JGroupsProducer.java   |  8 +---
 .../jira/consumer/WatchUpdatesConsumer.java        |  5 ++
 .../org/apache/camel/component/milo/Messages.java  |  5 +-
 .../component/milo/client/MiloClientComponent.java | 56 +---------------------
 .../component/milo/client/MiloClientConsumer.java  | 49 +++++++++----------
 .../component/milo/client/MiloClientEndpoint.java  | 28 ++++-------
 .../component/milo/client/MiloClientProducer.java  | 29 +++++++++--
 .../component/milo/server/MiloServerConsumer.java  | 39 +++++++--------
 .../component/milo/server/MiloServerEndpoint.java  | 12 +++--
 .../component/milo/server/MiloServerProducer.java  | 17 +++++--
 .../mllp/MllpTcpServerConsumerTransactionTest.java |  2 +
 22 files changed, 145 insertions(+), 184 deletions(-)

diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java
index 9976bf2..166508a 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java
@@ -26,8 +26,6 @@ import org.apache.ignite.Ignite;
  */
 public abstract class AbstractIgniteEndpoint extends DefaultEndpoint {
 
-    protected AbstractIgniteComponent component;
-
     @UriParam(defaultValue = "true")
     private boolean propagateIncomingBodyIfNoReturnValue = true;
 
@@ -39,10 +37,7 @@ public abstract class AbstractIgniteEndpoint extends DefaultEndpoint {
     }
 
     protected AbstractIgniteComponent igniteComponent() {
-        if (component == null) {
-            component = (AbstractIgniteComponent) getComponent();
-        }
-        return component;
+        return (AbstractIgniteComponent) getComponent();
     }
 
     protected Ignite ignite() {
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
index 91b4467..8f7e81f 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
@@ -47,16 +47,15 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
 
     private QueryCursor<Entry<Object, Object>> cursor;
 
-    public IgniteCacheContinuousQueryConsumer(IgniteCacheEndpoint endpoint, Processor processor,
-                                              IgniteCache<Object, Object> cache) {
+    public IgniteCacheContinuousQueryConsumer(IgniteCacheEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
-        this.cache = cache;
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+        cache = endpoint.obtainCache();
 
         launchContinuousQuery();
 
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java
index e6b0028..b3e7dc8 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java
@@ -90,12 +90,12 @@ public class IgniteCacheEndpoint extends AbstractIgniteEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer consumer = new IgniteCacheContinuousQueryConsumer(this, processor, obtainCache());
+        Consumer consumer = new IgniteCacheContinuousQueryConsumer(this, processor);
         configureConsumer(consumer);
         return consumer;
     }
 
-    private IgniteCache<Object, Object> obtainCache() throws CamelException {
+    protected IgniteCache<Object, Object> obtainCache() throws CamelException {
         IgniteCache<Object, Object> cache = ignite().cache(cacheName);
         if (cache == null) {
             if (failIfInexistentCache) {
diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java
index cbb6c25..d5bdc31 100644
--- a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java
+++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java
@@ -33,6 +33,7 @@ import org.apache.camel.Route;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.ignite.cache.IgniteCacheComponent;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.query.ScanQuery;
@@ -198,11 +199,13 @@ public class IgniteCacheContinuousQueryTest extends AbstractIgniteTest implement
     @AfterEach
     public void deleteCaches() {
         for (String cacheName : ImmutableSet.<String> of("testcontinuous1", "testcontinuous2", "testcontinuous3")) {
-            IgniteCache<?, ?> cache = ignite().cache(cacheName);
-            if (cache == null) {
-                continue;
+            Ignite ignite = ignite();
+            if (ignite != null) {
+                IgniteCache<?, ?> cache = ignite.cache(cacheName);
+                if (cache != null) {
+                    cache.clear();
+                }
             }
-            cache.clear();
         }
     }
 
diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
index 57e5e37..8993614 100644
--- a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
+++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
@@ -41,11 +41,16 @@ public class IronMQConsumer extends ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(IronMQConsumer.class);
 
-    private final io.iron.ironmq.Queue ironQueue;
+    private io.iron.ironmq.Queue ironQueue;
 
-    public IronMQConsumer(Endpoint endpoint, Processor processor, io.iron.ironmq.Queue ironQueue) {
+    public IronMQConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.ironQueue = ironQueue;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        ironQueue = getEndpoint().getClient().queue(getEndpoint().getConfiguration().getQueueName());
     }
 
     @Override
diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
index b957def..db0b244 100644
--- a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
+++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
@@ -61,7 +61,7 @@ public class IronMQEndpoint extends ScheduledPollEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        IronMQConsumer ironMQConsumer = new IronMQConsumer(this, processor, getClient().queue(configuration.getQueueName()));
+        IronMQConsumer ironMQConsumer = new IronMQConsumer(this, processor);
         configureConsumer(ironMQConsumer);
         ironMQConsumer.setMaxMessagesPerPoll(configuration.getMaxMessagesPerPoll());
         DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler();
diff --git a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
index a6f9fa5..cf275c7 100644
--- a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
+++ b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.jgroups.raft;
 
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
-import org.jgroups.raft.RaftHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,19 +28,17 @@ import org.slf4j.LoggerFactory;
 public class JGroupsRaftConsumer extends DefaultConsumer {
     private static final transient Logger LOG = LoggerFactory.getLogger(JGroupsRaftConsumer.class);
 
-    private final RaftHandle raftHandle;
     private final String clusterName;
     private boolean enableRoleChangeEvents;
 
     private final CamelRoleChangeListener roleListener;
     private final JGroupsRaftEndpoint endpoint;
 
-    public JGroupsRaftConsumer(JGroupsRaftEndpoint endpoint, Processor processor, RaftHandle raftHandle, String clusterName,
+    public JGroupsRaftConsumer(JGroupsRaftEndpoint endpoint, Processor processor, String clusterName,
                                boolean enableRoleChangeEvents) {
         super(endpoint, processor);
 
         this.endpoint = endpoint;
-        this.raftHandle = raftHandle;
         this.clusterName = clusterName;
         this.enableRoleChangeEvents = enableRoleChangeEvents;
 
@@ -53,7 +50,7 @@ public class JGroupsRaftConsumer extends DefaultConsumer {
         super.doStart();
         if (enableRoleChangeEvents) {
             LOG.debug("Connecting roleListener : {} to the cluster: {}.", roleListener, clusterName);
-            raftHandle.addRoleListener(roleListener);
+            endpoint.getResolvedRaftHandle().addRoleListener(roleListener);
         }
         endpoint.connect();
     }
@@ -62,7 +59,7 @@ public class JGroupsRaftConsumer extends DefaultConsumer {
     protected void doStop() throws Exception {
         if (enableRoleChangeEvents) {
             LOG.debug("Closing connection to cluster: {} from roleListener: {}.", clusterName, roleListener);
-            raftHandle.removeRoleListener(roleListener);
+            endpoint.getResolvedRaftHandle().removeRoleListener(roleListener);
         }
         endpoint.disconnect();
         super.doStop();
diff --git a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
index 71d86be..3e0dfac 100644
--- a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
+++ b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
@@ -72,12 +72,14 @@ public class JGroupsRaftEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new JGroupsRaftProducer(this, resolvedRaftHandle, clusterName);
+        return new JGroupsRaftProducer(this, clusterName);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new JGroupsRaftConsumer(this, processor, resolvedRaftHandle, clusterName, enableRoleChangeEvents);
+        JGroupsRaftConsumer consumer = new JGroupsRaftConsumer(this, processor, clusterName, enableRoleChangeEvents);
+        configureConsumer(consumer);
+        return consumer;
     }
 
     @Override
diff --git a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftProducer.java b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftProducer.java
index a1ed146..9b40bb5 100644
--- a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftProducer.java
+++ b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftProducer.java
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.support.DefaultProducer;
-import org.jgroups.raft.RaftHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,15 +32,13 @@ public class JGroupsRaftProducer extends DefaultProducer {
 
     // Producer settings
     private final JGroupsRaftEndpoint endpoint;
-    private final RaftHandle raftHandle;
     private final String clusterName;
 
     // Constructor
-    public JGroupsRaftProducer(JGroupsRaftEndpoint endpoint, RaftHandle raftHandle, String clusterName) {
+    public JGroupsRaftProducer(JGroupsRaftEndpoint endpoint, String clusterName) {
         super(endpoint);
 
         this.endpoint = endpoint;
-        this.raftHandle = raftHandle;
         this.clusterName = clusterName;
     }
 
@@ -74,14 +71,14 @@ public class JGroupsRaftProducer extends DefaultProducer {
             if (setOffset != null && setLength != null && setTimeout != null && setTimeUnit != null) {
                 LOG.debug("Calling set(byte[] {}, int {}, int {}, long {}, TimeUnit {}) method on raftHandle.", body, setOffset,
                         setLength, setTimeout, setTimeUnit);
-                result = raftHandle.set(body, setOffset, setLength, setTimeout, setTimeUnit);
+                result = endpoint.getResolvedRaftHandle().set(body, setOffset, setLength, setTimeout, setTimeUnit);
             } else if (setOffset != null && setLength != null) {
                 LOG.debug("Calling set(byte[] {}, int {}, int {}) method on raftHandle.", body, setOffset, setLength);
-                result = raftHandle.set(body, setOffset, setLength);
+                result = endpoint.getResolvedRaftHandle().set(body, setOffset, setLength);
             } else {
                 LOG.debug("Calling set(byte[] {}, int {}, int {} (i.e. body.length)) method on raftHandle.", body, 0,
                         body.length);
-                result = raftHandle.set(body, 0, body.length);
+                result = endpoint.getResolvedRaftHandle().set(body, 0, body.length);
             }
             endpoint.populateJGroupsRaftHeaders(exchange);
             exchange.getIn().setBody(result);
diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
index 51ce437..7cb4e90 100644
--- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
+++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.jgroups;
 
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
-import org.jgroups.JChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,17 +29,15 @@ public class JGroupsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(JGroupsConsumer.class);
 
-    private final JChannel channel;
     private final String clusterName;
 
     private final CamelJGroupsReceiver receiver;
     private final JGroupsEndpoint endpoint;
 
-    public JGroupsConsumer(JGroupsEndpoint endpoint, Processor processor, JChannel channel, String clusterName) {
+    public JGroupsConsumer(JGroupsEndpoint endpoint, Processor processor, String clusterName) {
         super(endpoint, processor);
 
         this.endpoint = endpoint;
-        this.channel = channel;
         this.clusterName = clusterName;
 
         this.receiver = new CamelJGroupsReceiver(endpoint, processor);
@@ -50,14 +47,14 @@ public class JGroupsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
         LOG.debug("Connecting receiver: {} to the cluster: {}.", receiver, clusterName);
-        channel.setReceiver(receiver);
+        endpoint.getResolvedChannel().setReceiver(receiver);
         endpoint.connect();
     }
 
     @Override
     protected void doStop() throws Exception {
         LOG.debug("Closing connection to cluster: {} from receiver: {}.", clusterName, receiver);
-        channel.setReceiver(null);
+        endpoint.getResolvedChannel().setReceiver(null);
         endpoint.disconnect();
         super.doStop();
     }
diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
index c6ba406..6d6ab0a 100644
--- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
+++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
@@ -72,12 +72,12 @@ public class JGroupsEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new JGroupsProducer(this, resolvedChannel, clusterName);
+        return new JGroupsProducer(this, clusterName);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        JGroupsConsumer consumer = new JGroupsConsumer(this, processor, resolvedChannel, clusterName);
+        JGroupsConsumer consumer = new JGroupsConsumer(this, processor, clusterName);
         configureConsumer(consumer);
         return consumer;
     }
diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
index a305f54..7fe3736 100644
--- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
+++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.jgroups;
 import org.apache.camel.Exchange;
 import org.apache.camel.support.DefaultProducer;
 import org.jgroups.Address;
-import org.jgroups.JChannel;
 import org.jgroups.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,17 +34,14 @@ public class JGroupsProducer extends DefaultProducer {
 
     private final JGroupsEndpoint endpoint;
 
-    private final JChannel channel;
-
     private final String clusterName;
 
     // Constructor
 
-    public JGroupsProducer(JGroupsEndpoint endpoint, JChannel channel, String clusterName) {
+    public JGroupsProducer(JGroupsEndpoint endpoint, String clusterName) {
         super(endpoint);
 
         this.endpoint = endpoint;
-        this.channel = channel;
         this.clusterName = clusterName;
     }
 
@@ -81,7 +77,7 @@ public class JGroupsProducer extends DefaultProducer {
             }
             Message message = new Message(destinationAddress, body);
             message.setSrc(sourceAddress);
-            channel.send(message);
+            endpoint.getResolvedChannel().send(message);
         } else {
             LOG.debug("Body is null, cannot post to channel.");
         }
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
index 359b9e2..a97d5fb 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
@@ -44,6 +44,11 @@ public class WatchUpdatesConsumer extends AbstractJiraConsumer {
         super(endpoint, processor);
         this.watchedFieldsList = new ArrayList<>();
         this.watchedFieldsList = Arrays.asList(endpoint.getWatchedFields().split(","));
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
         initIssues();
     }
 
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java
index c4de621..a871204 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java
@@ -16,10 +16,11 @@
  */
 package org.apache.camel.component.milo;
 
-import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.Message;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 
 public final class Messages {
+
     private Messages() {
     }
 
@@ -29,7 +30,7 @@ public final class Messages {
      * @param value  the value to feed from
      * @param result the result to feed to
      */
-    public static void fillFromDataValue(final DataValue value, final DefaultMessage result) {
+    public static void fillFromDataValue(final DataValue value, final Message result) {
         result.setBody(value);
     }
 }
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java
index 45fcea4..cd5ffd7 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java
@@ -16,57 +16,30 @@
  */
 package org.apache.camel.component.milo.client;
 
-import java.util.HashMap;
 import java.util.Map;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Component("milo-client")
 public class MiloClientComponent extends DefaultComponent {
 
-    private static final Logger LOG = LoggerFactory.getLogger(MiloClientComponent.class);
-
-    private final Map<String, MiloClientConnection> cache = new HashMap<>();
-    private final Multimap<String, MiloClientEndpoint> connectionMap = HashMultimap.create();
-
     @Metadata
     private MiloClientConfiguration configuration = new MiloClientConfiguration();
 
     @Override
     protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters)
             throws Exception {
+
         final MiloClientConfiguration configuration = new MiloClientConfiguration(this.configuration);
         configuration.setEndpointUri(remaining);
 
-        Endpoint endpoint = doCreateEndpoint(uri, configuration, parameters);
-        return endpoint;
-    }
-
-    private synchronized MiloClientEndpoint doCreateEndpoint(
-            final String uri, final MiloClientConfiguration configuration, final Map<String, Object> parameters)
-            throws Exception {
         final MiloClientEndpoint endpoint = new MiloClientEndpoint(uri, this, configuration.getEndpointUri());
         endpoint.setConfiguration(configuration);
         setProperties(endpoint, parameters);
 
-        final String cacheId = configuration.toCacheId();
-        MiloClientConnection connection = this.cache.get(cacheId);
-        if (connection == null) {
-            LOG.debug("Cache miss - creating new connection instance: {}", cacheId);
-            connection = new MiloClientConnection(configuration, endpoint.getMonitorFilterConfiguration());
-            this.cache.put(cacheId, connection);
-        }
-
-        // register connection with endpoint
-        this.connectionMap.put(cacheId, endpoint);
-        endpoint.setConnection(connection);
         return endpoint;
     }
 
@@ -109,31 +82,4 @@ public class MiloClientComponent extends DefaultComponent {
         this.configuration.setRequestTimeout(reconnectTimeout);
     }
 
-    public synchronized void disposed(final MiloClientEndpoint endpoint) {
-
-        final MiloClientConnection connection = endpoint.getConnection();
-
-        // unregister usage of connection
-
-        this.connectionMap.remove(connection.getConnectionId(), endpoint);
-
-        // test if this was the last endpoint using this connection
-
-        if (!this.connectionMap.containsKey(connection.getConnectionId())) {
-
-            // this was the last endpoint using the connection ...
-
-            // ... remove from the cache
-
-            this.cache.remove(connection.getConnectionId());
-
-            // ... and close
-
-            try {
-                connection.close();
-            } catch (final Exception e) {
-                LOG.warn("Failed to close connection", e);
-            }
-        }
-    }
 }
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
index 6ad6915..523c0c7 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
@@ -22,41 +22,35 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.milo.Messages;
 import org.apache.camel.component.milo.client.MiloClientConnection.MonitorHandle;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.support.DefaultMessage;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Objects.requireNonNull;
-
 public class MiloClientConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(MiloClientConsumer.class);
 
-    private final MiloClientConnection connection;
-
+    private MiloClientConnection connection;
     private MonitorHandle handle;
-
     private ExpandedNodeId node;
-
     private Double samplingInterval;
 
-    public MiloClientConsumer(final MiloClientEndpoint endpoint, final Processor processor,
-                              final MiloClientConnection connection) {
+    public MiloClientConsumer(final MiloClientEndpoint endpoint, final Processor processor) {
         super(endpoint, processor);
-
-        requireNonNull(connection);
-
-        this.connection = connection;
         this.node = endpoint.getNodeId();
         this.samplingInterval = endpoint.getSamplingInterval();
     }
 
     @Override
+    public MiloClientEndpoint getEndpoint() {
+        return (MiloClientEndpoint) super.getEndpoint();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         super.doStart();
-
+        this.connection = getEndpoint().createConnection();
         this.handle = this.connection.monitorValue(this.node, this.samplingInterval, this::handleValueUpdate);
     }
 
@@ -66,7 +60,13 @@ public class MiloClientConsumer extends DefaultConsumer {
             this.handle.unregister();
             this.handle = null;
         }
-
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
         super.doStop();
     }
 
@@ -74,24 +74,19 @@ public class MiloClientConsumer extends DefaultConsumer {
         LOG.debug("Handle item update - {} = {}", node, value);
 
         final Exchange exchange = getEndpoint().createExchange();
-        exchange.setIn(mapMessage(value));
+        mapToMessage(value, exchange.getMessage());
+
         try {
-            getAsyncProcessor().process(exchange);
+            getProcessor().process(exchange);
         } catch (final Exception e) {
-            LOG.debug("Failed to process message", e);
+            getExceptionHandler().handleException("Error processing exchange", e);
         }
     }
 
-    private Message mapMessage(final DataValue value) {
-        if (value == null) {
-            return null;
+    private void mapToMessage(final DataValue value, final Message message) {
+        if (value != null) {
+            Messages.fillFromDataValue(value, message);
         }
-
-        final DefaultMessage result = new DefaultMessage(getEndpoint().getCamelContext());
-
-        Messages.fillFromDataValue(value, result);
-
-        return result;
     }
 
 }
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
index 7841f71..f689e62 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
@@ -80,7 +80,6 @@ public class MiloClientEndpoint extends DefaultEndpoint {
     private MonitorFilterType monitorFilterType;
 
     private final MiloClientComponent component;
-    private MiloClientConnection connection;
 
     public MiloClientEndpoint(final String uri, final MiloClientComponent component, final String endpointUri) {
         super(uri, component);
@@ -110,30 +109,19 @@ public class MiloClientEndpoint extends DefaultEndpoint {
     }
 
     @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        this.component.disposed(this);
-        super.doStop();
-    }
-
-    @Override
     public Producer createProducer() throws Exception {
-        return new MiloClientProducer(this, this.connection, this.defaultAwaitWrites);
+        return new MiloClientProducer(this, this.defaultAwaitWrites);
     }
 
     @Override
     public Consumer createConsumer(final Processor processor) throws Exception {
-        MiloClientConsumer consumer = new MiloClientConsumer(this, processor, this.connection);
+        MiloClientConsumer consumer = new MiloClientConsumer(this, processor);
         configureConsumer(consumer);
         return consumer;
     }
 
-    public MiloClientConnection getConnection() {
-        return this.connection;
+    public MiloClientConnection createConnection() {
+        return new MiloClientConnection(configuration, monitorFilterConfiguration);
     }
 
     // item configuration
@@ -186,7 +174,11 @@ public class MiloClientEndpoint extends DefaultEndpoint {
         this.defaultAwaitWrites = defaultAwaitWrites;
     }
 
-    public void setConnection(MiloClientConnection connection) {
-        this.connection = connection;
+    public MonitorFilterType getMonitorFilterType() {
+        return monitorFilterType;
+    }
+
+    public void setMonitorFilterType(MonitorFilterType monitorFilterType) {
+        this.monitorFilterType = monitorFilterType;
     }
 }
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
index 04a7619..d244236 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
@@ -28,25 +28,46 @@ import static java.lang.Boolean.TRUE;
 
 public class MiloClientProducer extends DefaultAsyncProducer {
 
-    private final MiloClientConnection connection;
+    private MiloClientConnection connection;
 
     private final ExpandedNodeId nodeId;
     private final ExpandedNodeId methodId;
 
     private final boolean defaultAwaitWrites;
 
-    public MiloClientProducer(final MiloClientEndpoint endpoint, final MiloClientConnection connection,
+    public MiloClientProducer(final MiloClientEndpoint endpoint,
                               final boolean defaultAwaitWrites) {
         super(endpoint);
 
-        this.connection = connection;
         this.defaultAwaitWrites = defaultAwaitWrites;
-
         this.nodeId = endpoint.getNodeId();
         this.methodId = endpoint.getMethodId();
     }
 
     @Override
+    public MiloClientEndpoint getEndpoint() {
+        return (MiloClientEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.connection = getEndpoint().createConnection();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+        super.doStop();
+    }
+
+    @Override
     public boolean process(Exchange exchange, AsyncCallback async) {
         final Message msg = exchange.getIn();
         final Object value = msg.getBody();
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
index 451b59a..f958974 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
@@ -18,61 +18,56 @@ package org.apache.camel.component.milo.server;
 
 import java.util.function.Consumer;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.milo.Messages;
 import org.apache.camel.component.milo.server.internal.CamelServerItem;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.support.DefaultMessage;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 
 public class MiloServerConsumer extends DefaultConsumer {
 
-    private final CamelServerItem item;
     private final Consumer<DataValue> writeHandler = this::performWrite;
+    private CamelServerItem item;
 
-    public MiloServerConsumer(final Endpoint endpoint, final Processor processor, final CamelServerItem item) {
+    public MiloServerConsumer(final MiloServerEndpoint endpoint, final Processor processor) {
         super(endpoint, processor);
-        this.item = item;
+    }
+
+    @Override
+    public MiloServerEndpoint getEndpoint() {
+        return (MiloServerEndpoint) super.getEndpoint();
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-
+        this.item = getEndpoint().getItem();
         this.item.addWriteListener(this.writeHandler);
     }
 
     @Override
     protected void doStop() throws Exception {
         this.item.removeWriteListener(this.writeHandler);
-
         super.doStop();
     }
 
     protected void performWrite(final DataValue value) {
-
-        final Exchange exchange = getEndpoint().createExchange();
-        exchange.setIn(mapToMessage(value));
+        Exchange exchange = getEndpoint().createExchange();
+        mapToMessage(value, exchange.getMessage());
 
         try {
-            getAsyncProcessor().process(exchange);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
+            getProcessor().process(exchange);
+        } catch (Exception e) {
+            getExceptionHandler().handleException("Error processing exchange", e);
         }
     }
 
-    private DefaultMessage mapToMessage(final DataValue value) {
-        if (value == null) {
-            return null;
+    private void mapToMessage(final DataValue value, final Message message) {
+        if (value != null) {
+            Messages.fillFromDataValue(value, message);
         }
-
-        final DefaultMessage result = new DefaultMessage(getEndpoint().getCamelContext());
-
-        Messages.fillFromDataValue(value, result);
-
-        return result;
     }
 
 }
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerEndpoint.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerEndpoint.java
index 99fd31d..027eb58 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerEndpoint.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerEndpoint.java
@@ -34,12 +34,12 @@ import org.apache.camel.support.DefaultEndpoint;
              category = { Category.IOT })
 public class MiloServerEndpoint extends DefaultEndpoint {
 
+    private volatile CamelServerItem item;
+
     @UriPath
     @Metadata(required = true)
     private String itemId;
 
-    private CamelServerItem item;
-
     public MiloServerEndpoint(final String uri, final String itemId, final Component component) {
         super(uri, component);
         this.itemId = itemId;
@@ -67,16 +67,20 @@ public class MiloServerEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new MiloServerProducer(this, this.item);
+        return new MiloServerProducer(this);
     }
 
     @Override
     public Consumer createConsumer(final Processor processor) throws Exception {
-        MiloServerConsumer consumer = new MiloServerConsumer(this, processor, this.item);
+        MiloServerConsumer consumer = new MiloServerConsumer(this, processor);
         configureConsumer(consumer);
         return consumer;
     }
 
+    CamelServerItem getItem() {
+        return item;
+    }
+
     /**
      * ID of the item
      */
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerProducer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerProducer.java
index 357e5f5..03a073f 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerProducer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerProducer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.milo.server;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.milo.server.internal.CamelServerItem;
 import org.apache.camel.support.DefaultProducer;
@@ -27,11 +26,21 @@ public class MiloServerProducer extends DefaultProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(MiloServerProducer.class);
 
-    private final CamelServerItem item;
+    private CamelServerItem item;
 
-    public MiloServerProducer(final Endpoint endpoint, final CamelServerItem item) {
+    public MiloServerProducer(final MiloServerEndpoint endpoint) {
         super(endpoint);
-        this.item = item;
+    }
+
+    @Override
+    public MiloServerEndpoint getEndpoint() {
+        return (MiloServerEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.item = getEndpoint().getItem();
     }
 
     @Override
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTransactionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTransactionTest.java
index fa5b840..1bd97a2 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTransactionTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTransactionTest.java
@@ -32,9 +32,11 @@ import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+@Disabled("This test hangs")
 public class MllpTcpServerConsumerTransactionTest extends CamelTestSupport {
 
     @RegisterExtension