You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/04/28 08:44:52 UTC

[camel-spring-boot] branch main updated: [CAMEL-18005]add tests in camel-paho-starter (#543)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b027d2446f [CAMEL-18005]add tests in camel-paho-starter (#543)
5b027d2446f is described below

commit 5b027d2446f3c22574f22e9163263749aa1cc817
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Thu Apr 28 04:44:48 2022 -0400

    [CAMEL-18005]add tests in camel-paho-starter (#543)
---
 components-starter/camel-paho-starter/pom.xml      |   7 +
 .../paho/springboot/PahoComponentTest.java         | 228 +++++++++++++++++++++
 .../PahoComponentVerifierExtensionTest.java        | 105 ++++++++++
 .../paho/springboot/PahoOverrideTopicTest.java     | 112 ++++++++++
 .../springboot/PahoReconnectAfterFailureTest.java  | 203 ++++++++++++++++++
 .../paho/springboot/PahoToDSendDynamicTest.java    | 120 +++++++++++
 .../component/paho/springboot/PahoToDTest.java     | 116 +++++++++++
 7 files changed, 891 insertions(+)

diff --git a/components-starter/camel-paho-starter/pom.xml b/components-starter/camel-paho-starter/pom.xml
index 5f25e12247a..c9982874704 100644
--- a/components-starter/camel-paho-starter/pom.xml
+++ b/components-starter/camel-paho-starter/pom.xml
@@ -39,6 +39,13 @@
       <artifactId>camel-paho</artifactId>
       <version>${camel-version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-infra-activemq</artifactId>
+      <version>${camel-version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
     <!--START OF GENERATED CODE-->
     <dependency>
       <groupId>org.apache.camel.springboot</groupId>
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentTest.java
new file mode 100644
index 00000000000..86131bcd09b
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.component.paho.PahoConstants;
+import org.apache.camel.component.paho.PahoEndpoint;
+import org.apache.camel.component.paho.PahoMessage;
+import org.apache.camel.component.paho.PahoPersistence;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        PahoComponentTest.class,
+        PahoComponentTest.TestConfiguration.class
+    }
+)
+public class PahoComponentTest {
+    
+    static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+    @RegisterExtension
+    public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+            .bare()
+            .withPersistent(false)
+            .withMqttTransport(mqttPort)
+            .build();
+
+
+    
+    @Autowired
+    ProducerTemplate template;
+    
+    @Autowired
+    CamelContext context;
+    
+    @EndpointInject("mock:test")
+    MockEndpoint mock;
+    
+    @EndpointInject("mock:testCustomizedPaho")
+    MockEndpoint testCustomizedPahoMock;
+    
+    @Test
+    public void checkOptions() {
+        String uri = "paho:/test/topic" + "?clientId=sampleClient" + "&brokerUrl=tcp://localhost:" + mqttPort + "&qos=2"
+                     + "&persistence=file";
+
+        PahoEndpoint endpoint = context.getEndpoint(uri, PahoEndpoint.class);
+
+        // Then
+        assertEquals("/test/topic", endpoint.getTopic());
+        assertEquals("sampleClient", endpoint.getConfiguration().getClientId());
+        assertEquals("tcp://localhost:" + mqttPort, endpoint.getConfiguration().getBrokerUrl());
+        assertEquals(2, endpoint.getConfiguration().getQos());
+        assertEquals(PahoPersistence.FILE, endpoint.getConfiguration().getPersistence());
+    }
+
+    @Test
+    public void shouldReadMessageFromMqtt() throws InterruptedException {
+        mock.reset();
+        // Given
+        String msg = "msg";
+        mock.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("direct:test", msg);
+
+        // Then
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void shouldNotReadMessageFromUnregisteredTopic() throws InterruptedException {
+        mock.reset();
+        // Given
+        mock.expectedMessageCount(0);
+
+        // When
+        template.sendBody("paho:someRandomQueue?brokerUrl=tcp://localhost:" + mqttPort, "msg");
+
+        // Then
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void shouldKeepDefaultMessageInHeader() throws InterruptedException, UnsupportedEncodingException {
+        mock.reset();
+        // Given
+        final String msg = "msg";
+        mock.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("direct:test", msg);
+
+        // Then
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8");
+
+        assertEquals("queue", exchange.getIn().getHeader(PahoConstants.MQTT_TOPIC));
+        assertEquals(msg, payload);
+    }
+
+    @Test
+    public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+        mock.reset();
+        // Given
+        final String msg = "msg";
+        mock.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("direct:test2", msg);
+
+        // Then
+        mock.assertIsSatisfied();
+        Exchange exchange = mock.getExchanges().get(0);
+
+        MqttMessage message = exchange.getIn(PahoMessage.class).getMqttMessage();
+        assertNotNull(message);
+        assertEquals(msg, new String(message.getPayload()));
+    }
+
+    @Test
+    public void shouldReadMessageFromCustomizedComponent() throws InterruptedException {
+        testCustomizedPahoMock.reset();
+        // Given
+        String msg = "msg";
+        testCustomizedPahoMock.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("direct:testCustomizedPaho", msg);
+
+        // Then
+        testCustomizedPahoMock.assertIsSatisfied();
+    }
+
+    @Test
+    public void shouldNotSendMessageAuthIsNotValid() throws InterruptedException {
+        mock.reset();
+        // Given
+        mock.expectedMessageCount(0);
+
+        // When
+        template.sendBody("paho:someRandomQueue?brokerUrl=tcp://localhost:" + mqttPort + "&userName=test&password=test", "msg");
+
+        // Then
+        mock.assertIsSatisfied();
+    }
+
+    
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    PahoComponent customizedPaho = new PahoComponent();
+                    context.addComponent("customizedPaho", customizedPaho);
+
+                    from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+                    from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test");
+
+                    from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+
+                    from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
+
+                    from("direct:testCustomizedPaho").to("customizedPaho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort);
+                    from("paho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort).to("mock:testCustomizedPaho");
+                }
+            };
+        }
+    }
+    
+   
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentVerifierExtensionTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentVerifierExtensionTest.java
new file mode 100644
index 00000000000..922ddceb366
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentVerifierExtensionTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        PahoComponentVerifierExtensionTest.class
+    }
+)
+public class PahoComponentVerifierExtensionTest {
+    
+    static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+    @RegisterExtension
+    public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+            .bare()
+            .withPersistent(false)
+            .withMqttTransport(mqttPort)
+            .build();
+
+
+    
+    @Autowired
+    ProducerTemplate template;
+    
+    @Autowired
+    CamelContext context;
+    
+    @EndpointInject("mock:test")
+    MockEndpoint mock;
+    
+    @Test
+    public void testParameters() {
+        Component component = context.getComponent("paho");
+
+        ComponentVerifierExtension verifier
+                = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("brokerUrl", "l");
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+
+        assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+    }
+
+    @Test
+    public void testConnectivity() {
+        Component component = context.getComponent("paho");
+        ComponentVerifierExtension verifier
+                = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("brokerUrl", "l");
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+
+        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+    }
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoOverrideTopicTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoOverrideTopicTest.java
new file mode 100644
index 00000000000..319f82dd1e8
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoOverrideTopicTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.component.paho.PahoConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        PahoOverrideTopicTest.class,
+        PahoOverrideTopicTest.TestConfiguration.class
+    }
+)
+public class PahoOverrideTopicTest {
+    
+    static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+    @RegisterExtension
+    public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+            .bare()
+            .withPersistent(false)
+            .withMqttTransport(mqttPort)
+            .build();
+
+
+    
+    @Autowired
+    ProducerTemplate template;
+    
+    @Autowired
+    CamelContext context;
+    
+    @EndpointInject("mock:test")
+    MockEndpoint mock;
+    
+    @Test
+    public void shouldOverride() throws InterruptedException {
+        // Given
+        mock.expectedMessageCount(1);
+
+        // When
+        template.sendBodyAndHeader("direct:test", "Hello World", PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, "myoverride");
+
+        // Then
+        mock.assertIsSatisfied();
+    }
+    
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    PahoComponent paho = context.getComponent("paho", PahoComponent.class);
+                    paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort);
+
+                    from("direct:test").to("paho:queue").log("Message sent");
+
+                    from("paho:myoverride").log("Message received").to("mock:test");
+                }
+            };
+        }
+    }
+    
+   
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoReconnectAfterFailureTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoReconnectAfterFailureTest.java
new file mode 100644
index 00000000000..d479816dad4
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoReconnectAfterFailureTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.Route;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.RouteController;
+import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.spring.boot.CamelContextConfiguration;
+import org.apache.camel.support.RoutePolicySupport;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        PahoReconnectAfterFailureTest.class,
+        PahoReconnectAfterFailureTest.TestConfiguration.class
+    }
+)
+public class PahoReconnectAfterFailureTest {
+    
+    public static final String TESTING_ROUTE_ID = "testingRoute";
+    BrokerService broker;
+
+    static int mqttPort = AvailablePortFinder.getNextAvailable();
+    CountDownLatch routeStartedLatch = new CountDownLatch(1);
+
+    
+    @Autowired
+    ProducerTemplate template;
+    
+    @Autowired
+    CamelContext context;
+    
+    @EndpointInject("mock:test")
+    MockEndpoint mock;
+    
+       
+    @BeforeEach
+    public void doPreSetup() throws Exception {
+        broker = ActiveMQEmbeddedServiceBuilder
+                .bare()
+                .withPersistent(false)
+                .build().getBrokerService();
+
+        // Broker will be started later, after camel context is started,
+        // to ensure first consumer connection fails
+    }
+    
+    @AfterEach
+    public void doCleanUp() throws Exception {
+        broker.stop();
+    }
+    
+    @Bean
+    CamelContextConfiguration contextConfiguration() {
+        return new CamelContextConfiguration() {
+            @Override
+            public void beforeApplicationStart(CamelContext context) {
+                // Setup supervisor to restart routes because paho consumer 
+                // is not able to recover automatically on startup
+                SupervisingRouteController supervising = context.getRouteController().supervising();
+                supervising.setBackOffDelay(500);
+                supervising.setIncludeRoutes("paho:*");
+            }
+
+            @Override
+            public void afterApplicationStart(CamelContext camelContext) {
+                // TODO Auto-generated method stub
+                
+            }
+        };
+    }
+    
+    @Test
+    public void startConsumerShouldReconnectMqttClientAfterFailures() throws Exception {
+        mock.reset();
+        RouteController routeController = context.getRouteController();
+
+        assertNotEquals(ServiceStatus.Started, routeController.getRouteStatus(TESTING_ROUTE_ID),
+                "Broker down, expecting  route not to be started");
+
+        // Start broker and wait for supervisor to restart route
+        // consumer should now connect
+        startBroker();
+        routeStartedLatch.await(5, TimeUnit.SECONDS);
+        assertEquals(ServiceStatus.Started, routeController.getRouteStatus(TESTING_ROUTE_ID),
+                "Expecting consumer connected to broker and route started");
+
+        // Given
+        String msg = "msg";
+        mock.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + mqttPort, msg);
+
+        // Then
+        mock.assertIsSatisfied();
+
+    }
+    
+    @Test
+    public void startProducerShouldReconnectMqttClientAfterFailures() throws Exception {
+        mock.reset();
+        String msg = "msg";
+        mock.expectedBodiesReceived(msg);
+
+        try {
+            template.sendBody("direct:test", "notSentMessage");
+            fail("Broker is down, paho producer should fail");
+        } catch (Exception e) {
+            // ignore
+        }
+
+        startBroker();
+        routeStartedLatch.await(5, TimeUnit.SECONDS);
+
+        template.sendBody("direct:test", msg);
+
+        mock.assertIsSatisfied(20000);
+    }
+
+    private void startBroker() throws Exception {
+        broker.addConnector("mqtt://localhost:" + mqttPort);
+        broker.start();
+    }
+    
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+
+                    from("direct:test").to("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + mqttPort);
+                    from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort)
+                            .id(TESTING_ROUTE_ID)
+                            .routePolicy(new RoutePolicySupport() {
+                                @Override
+                                public void onStart(Route route) {
+                                    routeStartedLatch.countDown();
+                                }
+                            })
+                            .to("mock:test");
+                }
+            };
+        }
+    }
+    
+   
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDSendDynamicTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDSendDynamicTest.java
new file mode 100644
index 00000000000..d7f20dbadd6
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDSendDynamicTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        PahoToDSendDynamicTest.class,
+        PahoToDSendDynamicTest.TestConfiguration.class
+    }
+)
+public class PahoToDSendDynamicTest {
+    
+    static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+    @RegisterExtension
+    public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+            .bare()
+            .withPersistent(false)
+            .withMqttTransport(mqttPort)
+            .build();
+
+
+    
+    @Autowired
+    ProducerTemplate template;
+    
+    @Autowired
+    ConsumerTemplate consumer;
+    
+    @Autowired
+    CamelContext context;
+    
+    @EndpointInject("mock:test")
+    MockEndpoint mock;
+    
+    @Test
+    public void testToD() {
+        template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+        template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+
+        // there should only be one paho endpoint
+        long count = context.getEndpoints().stream().filter(e -> e.getEndpointUri().startsWith("paho:")).count();
+        assertEquals(1, count, "There should only be 1 paho endpoint");
+
+        // and the messages should be in the queues
+        String out = consumer.receiveBody("paho:bar", 2000, String.class);
+        assertEquals("Hello bar", out);
+        out = consumer.receiveBody("paho:beer", 2000, String.class);
+        assertEquals("Hello beer", out);
+    }
+    
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    PahoComponent paho = context.getComponent("paho", PahoComponent.class);
+                    paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort);
+
+                    // route message dynamic using toD
+                    from("direct:start").toD("paho:${header.where}?retained=true");
+                }
+            };
+        }
+    }
+    
+   
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDTest.java
new file mode 100644
index 00000000000..38cb5c5016b
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        PahoToDTest.class,
+        PahoToDTest.TestConfiguration.class
+    }
+)
+public class PahoToDTest {
+    
+    static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+    @RegisterExtension
+    public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+            .bare()
+            .withPersistent(false)
+            .withMqttTransport(mqttPort)
+            .build();
+
+
+    
+    @Autowired
+    ProducerTemplate template;
+    
+    @Autowired
+    CamelContext context;
+    
+    @EndpointInject("mock:bar")
+    MockEndpoint mockBar;
+    
+    @EndpointInject("mock:beer")
+    MockEndpoint mockBeer;
+    
+    @Test
+    public void testToD() throws Exception {
+        mockBar.expectedBodiesReceived("Hello bar");
+        mockBeer.expectedBodiesReceived("Hello beer");
+
+        template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+        template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+
+        mockBar.assertIsSatisfied();
+        mockBeer.assertIsSatisfied();
+    }
+    
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    PahoComponent paho = context.getComponent("paho", PahoComponent.class);
+                    paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort);
+
+                    // route message dynamic using toD
+                    from("direct:start").toD("paho:${header.where}");
+
+                    from("paho:bar").to("mock:bar");
+                    from("paho:beer").to("mock:beer");
+                }
+            };
+        }
+    }
+    
+   
+
+}