You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/04/28 08:44:52 UTC
[camel-spring-boot] branch main updated: [CAMEL-18005]add tests in camel-paho-starter (#543)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push:
new 5b027d2446f [CAMEL-18005]add tests in camel-paho-starter (#543)
5b027d2446f is described below
commit 5b027d2446f3c22574f22e9163263749aa1cc817
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Thu Apr 28 04:44:48 2022 -0400
[CAMEL-18005]add tests in camel-paho-starter (#543)
---
components-starter/camel-paho-starter/pom.xml | 7 +
.../paho/springboot/PahoComponentTest.java | 228 +++++++++++++++++++++
.../PahoComponentVerifierExtensionTest.java | 105 ++++++++++
.../paho/springboot/PahoOverrideTopicTest.java | 112 ++++++++++
.../springboot/PahoReconnectAfterFailureTest.java | 203 ++++++++++++++++++
.../paho/springboot/PahoToDSendDynamicTest.java | 120 +++++++++++
.../component/paho/springboot/PahoToDTest.java | 116 +++++++++++
7 files changed, 891 insertions(+)
diff --git a/components-starter/camel-paho-starter/pom.xml b/components-starter/camel-paho-starter/pom.xml
index 5f25e12247a..c9982874704 100644
--- a/components-starter/camel-paho-starter/pom.xml
+++ b/components-starter/camel-paho-starter/pom.xml
@@ -39,6 +39,13 @@
<artifactId>camel-paho</artifactId>
<version>${camel-version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-activemq</artifactId>
+ <version>${camel-version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<!--START OF GENERATED CODE-->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentTest.java
new file mode 100644
index 00000000000..86131bcd09b
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.component.paho.PahoConstants;
+import org.apache.camel.component.paho.PahoEndpoint;
+import org.apache.camel.component.paho.PahoMessage;
+import org.apache.camel.component.paho.PahoPersistence;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ PahoComponentTest.class,
+ PahoComponentTest.TestConfiguration.class
+ }
+)
+public class PahoComponentTest {
+
+ static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+ @RegisterExtension
+ public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+ .bare()
+ .withPersistent(false)
+ .withMqttTransport(mqttPort)
+ .build();
+
+
+
+ @Autowired
+ ProducerTemplate template;
+
+ @Autowired
+ CamelContext context;
+
+ @EndpointInject("mock:test")
+ MockEndpoint mock;
+
+ @EndpointInject("mock:testCustomizedPaho")
+ MockEndpoint testCustomizedPahoMock;
+
+ @Test
+ public void checkOptions() {
+ String uri = "paho:/test/topic" + "?clientId=sampleClient" + "&brokerUrl=tcp://localhost:" + mqttPort + "&qos=2"
+ + "&persistence=file";
+
+ PahoEndpoint endpoint = context.getEndpoint(uri, PahoEndpoint.class);
+
+ // Then
+ assertEquals("/test/topic", endpoint.getTopic());
+ assertEquals("sampleClient", endpoint.getConfiguration().getClientId());
+ assertEquals("tcp://localhost:" + mqttPort, endpoint.getConfiguration().getBrokerUrl());
+ assertEquals(2, endpoint.getConfiguration().getQos());
+ assertEquals(PahoPersistence.FILE, endpoint.getConfiguration().getPersistence());
+ }
+
+ @Test
+ public void shouldReadMessageFromMqtt() throws InterruptedException {
+ mock.reset();
+ // Given
+ String msg = "msg";
+ mock.expectedBodiesReceived(msg);
+
+ // When
+ template.sendBody("direct:test", msg);
+
+ // Then
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void shouldNotReadMessageFromUnregisteredTopic() throws InterruptedException {
+ mock.reset();
+ // Given
+ mock.expectedMessageCount(0);
+
+ // When
+ template.sendBody("paho:someRandomQueue?brokerUrl=tcp://localhost:" + mqttPort, "msg");
+
+ // Then
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void shouldKeepDefaultMessageInHeader() throws InterruptedException, UnsupportedEncodingException {
+ mock.reset();
+ // Given
+ final String msg = "msg";
+ mock.expectedBodiesReceived(msg);
+
+ // When
+ template.sendBody("direct:test", msg);
+
+ // Then
+ mock.assertIsSatisfied();
+
+ Exchange exchange = mock.getExchanges().get(0);
+ String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8");
+
+ assertEquals("queue", exchange.getIn().getHeader(PahoConstants.MQTT_TOPIC));
+ assertEquals(msg, payload);
+ }
+
+ @Test
+ public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+ mock.reset();
+ // Given
+ final String msg = "msg";
+ mock.expectedBodiesReceived(msg);
+
+ // When
+ template.sendBody("direct:test2", msg);
+
+ // Then
+ mock.assertIsSatisfied();
+ Exchange exchange = mock.getExchanges().get(0);
+
+ MqttMessage message = exchange.getIn(PahoMessage.class).getMqttMessage();
+ assertNotNull(message);
+ assertEquals(msg, new String(message.getPayload()));
+ }
+
+ @Test
+ public void shouldReadMessageFromCustomizedComponent() throws InterruptedException {
+ testCustomizedPahoMock.reset();
+ // Given
+ String msg = "msg";
+ testCustomizedPahoMock.expectedBodiesReceived(msg);
+
+ // When
+ template.sendBody("direct:testCustomizedPaho", msg);
+
+ // Then
+ testCustomizedPahoMock.assertIsSatisfied();
+ }
+
+ @Test
+ public void shouldNotSendMessageAuthIsNotValid() throws InterruptedException {
+ mock.reset();
+ // Given
+ mock.expectedMessageCount(0);
+
+ // When
+ template.sendBody("paho:someRandomQueue?brokerUrl=tcp://localhost:" + mqttPort + "&userName=test&password=test", "msg");
+
+ // Then
+ mock.assertIsSatisfied();
+ }
+
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ PahoComponent customizedPaho = new PahoComponent();
+ context.addComponent("customizedPaho", customizedPaho);
+
+ from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+ from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test");
+
+ from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+
+ from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
+
+ from("direct:testCustomizedPaho").to("customizedPaho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort);
+ from("paho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort).to("mock:testCustomizedPaho");
+ }
+ };
+ }
+ }
+
+
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentVerifierExtensionTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentVerifierExtensionTest.java
new file mode 100644
index 00000000000..922ddceb366
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoComponentVerifierExtensionTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ PahoComponentVerifierExtensionTest.class
+ }
+)
+public class PahoComponentVerifierExtensionTest {
+
+ static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+ @RegisterExtension
+ public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+ .bare()
+ .withPersistent(false)
+ .withMqttTransport(mqttPort)
+ .build();
+
+
+
+ @Autowired
+ ProducerTemplate template;
+
+ @Autowired
+ CamelContext context;
+
+ @EndpointInject("mock:test")
+ MockEndpoint mock;
+
+ @Test
+ public void testParameters() {
+ Component component = context.getComponent("paho");
+
+ ComponentVerifierExtension verifier
+ = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("brokerUrl", "l");
+
+ ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+
+ assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+ }
+
+ @Test
+ public void testConnectivity() {
+ Component component = context.getComponent("paho");
+ ComponentVerifierExtension verifier
+ = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("brokerUrl", "l");
+
+ ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+
+ assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ }
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoOverrideTopicTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoOverrideTopicTest.java
new file mode 100644
index 00000000000..319f82dd1e8
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoOverrideTopicTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.component.paho.PahoConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ PahoOverrideTopicTest.class,
+ PahoOverrideTopicTest.TestConfiguration.class
+ }
+)
+public class PahoOverrideTopicTest {
+
+ static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+ @RegisterExtension
+ public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+ .bare()
+ .withPersistent(false)
+ .withMqttTransport(mqttPort)
+ .build();
+
+
+
+ @Autowired
+ ProducerTemplate template;
+
+ @Autowired
+ CamelContext context;
+
+ @EndpointInject("mock:test")
+ MockEndpoint mock;
+
+ @Test
+ public void shouldOverride() throws InterruptedException {
+ // Given
+ mock.expectedMessageCount(1);
+
+ // When
+ template.sendBodyAndHeader("direct:test", "Hello World", PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, "myoverride");
+
+ // Then
+ mock.assertIsSatisfied();
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ PahoComponent paho = context.getComponent("paho", PahoComponent.class);
+ paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort);
+
+ from("direct:test").to("paho:queue").log("Message sent");
+
+ from("paho:myoverride").log("Message received").to("mock:test");
+ }
+ };
+ }
+ }
+
+
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoReconnectAfterFailureTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoReconnectAfterFailureTest.java
new file mode 100644
index 00000000000..d479816dad4
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoReconnectAfterFailureTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.Route;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.RouteController;
+import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.spring.boot.CamelContextConfiguration;
+import org.apache.camel.support.RoutePolicySupport;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ PahoReconnectAfterFailureTest.class,
+ PahoReconnectAfterFailureTest.TestConfiguration.class
+ }
+)
+public class PahoReconnectAfterFailureTest {
+
+ public static final String TESTING_ROUTE_ID = "testingRoute";
+ BrokerService broker;
+
+ static int mqttPort = AvailablePortFinder.getNextAvailable();
+ CountDownLatch routeStartedLatch = new CountDownLatch(1);
+
+
+ @Autowired
+ ProducerTemplate template;
+
+ @Autowired
+ CamelContext context;
+
+ @EndpointInject("mock:test")
+ MockEndpoint mock;
+
+
+ @BeforeEach
+ public void doPreSetup() throws Exception {
+ broker = ActiveMQEmbeddedServiceBuilder
+ .bare()
+ .withPersistent(false)
+ .build().getBrokerService();
+
+ // Broker will be started later, after camel context is started,
+ // to ensure first consumer connection fails
+ }
+
+ @AfterEach
+ public void doCleanUp() throws Exception {
+ broker.stop();
+ }
+
+ @Bean
+ CamelContextConfiguration contextConfiguration() {
+ return new CamelContextConfiguration() {
+ @Override
+ public void beforeApplicationStart(CamelContext context) {
+ // Setup supervisor to restart routes because paho consumer
+ // is not able to recover automatically on startup
+ SupervisingRouteController supervising = context.getRouteController().supervising();
+ supervising.setBackOffDelay(500);
+ supervising.setIncludeRoutes("paho:*");
+ }
+
+ @Override
+ public void afterApplicationStart(CamelContext camelContext) {
+ // TODO Auto-generated method stub
+
+ }
+ };
+ }
+
+ @Test
+ public void startConsumerShouldReconnectMqttClientAfterFailures() throws Exception {
+ mock.reset();
+ RouteController routeController = context.getRouteController();
+
+ assertNotEquals(ServiceStatus.Started, routeController.getRouteStatus(TESTING_ROUTE_ID),
+ "Broker down, expecting route not to be started");
+
+ // Start broker and wait for supervisor to restart route
+ // consumer should now connect
+ startBroker();
+ routeStartedLatch.await(5, TimeUnit.SECONDS);
+ assertEquals(ServiceStatus.Started, routeController.getRouteStatus(TESTING_ROUTE_ID),
+ "Expecting consumer connected to broker and route started");
+
+ // Given
+ String msg = "msg";
+ mock.expectedBodiesReceived(msg);
+
+ // When
+ template.sendBody("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + mqttPort, msg);
+
+ // Then
+ mock.assertIsSatisfied();
+
+ }
+
+ @Test
+ public void startProducerShouldReconnectMqttClientAfterFailures() throws Exception {
+ mock.reset();
+ String msg = "msg";
+ mock.expectedBodiesReceived(msg);
+
+ try {
+ template.sendBody("direct:test", "notSentMessage");
+ fail("Broker is down, paho producer should fail");
+ } catch (Exception e) {
+ // ignore
+ }
+
+ startBroker();
+ routeStartedLatch.await(5, TimeUnit.SECONDS);
+
+ template.sendBody("direct:test", msg);
+
+ mock.assertIsSatisfied(20000);
+ }
+
+ private void startBroker() throws Exception {
+ broker.addConnector("mqtt://localhost:" + mqttPort);
+ broker.start();
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:test").to("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + mqttPort);
+ from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort)
+ .id(TESTING_ROUTE_ID)
+ .routePolicy(new RoutePolicySupport() {
+ @Override
+ public void onStart(Route route) {
+ routeStartedLatch.countDown();
+ }
+ })
+ .to("mock:test");
+ }
+ };
+ }
+ }
+
+
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDSendDynamicTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDSendDynamicTest.java
new file mode 100644
index 00000000000..d7f20dbadd6
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDSendDynamicTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ PahoToDSendDynamicTest.class,
+ PahoToDSendDynamicTest.TestConfiguration.class
+ }
+)
+public class PahoToDSendDynamicTest {
+
+ static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+ @RegisterExtension
+ public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+ .bare()
+ .withPersistent(false)
+ .withMqttTransport(mqttPort)
+ .build();
+
+
+
+ @Autowired
+ ProducerTemplate template;
+
+ @Autowired
+ ConsumerTemplate consumer;
+
+ @Autowired
+ CamelContext context;
+
+ @EndpointInject("mock:test")
+ MockEndpoint mock;
+
+ @Test
+ public void testToD() {
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+
+ // there should only be one paho endpoint
+ long count = context.getEndpoints().stream().filter(e -> e.getEndpointUri().startsWith("paho:")).count();
+ assertEquals(1, count, "There should only be 1 paho endpoint");
+
+ // and the messages should be in the queues
+ String out = consumer.receiveBody("paho:bar", 2000, String.class);
+ assertEquals("Hello bar", out);
+ out = consumer.receiveBody("paho:beer", 2000, String.class);
+ assertEquals("Hello beer", out);
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ PahoComponent paho = context.getComponent("paho", PahoComponent.class);
+ paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort);
+
+ // route message dynamic using toD
+ from("direct:start").toD("paho:${header.where}?retained=true");
+ }
+ };
+ }
+ }
+
+
+
+}
diff --git a/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDTest.java b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDTest.java
new file mode 100644
index 00000000000..38cb5c5016b
--- /dev/null
+++ b/components-starter/camel-paho-starter/src/test/java/org/apache/camel/component/paho/springboot/PahoToDTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.springboot;
+
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.PahoComponent;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService;
+import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ PahoToDTest.class,
+ PahoToDTest.TestConfiguration.class
+ }
+)
+public class PahoToDTest {
+
+ static int mqttPort = AvailablePortFinder.getNextAvailable();
+
+ @RegisterExtension
+ public static ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder
+ .bare()
+ .withPersistent(false)
+ .withMqttTransport(mqttPort)
+ .build();
+
+
+
+ @Autowired
+ ProducerTemplate template;
+
+ @Autowired
+ CamelContext context;
+
+ @EndpointInject("mock:bar")
+ MockEndpoint mockBar;
+
+ @EndpointInject("mock:beer")
+ MockEndpoint mockBeer;
+
+ @Test
+ public void testToD() throws Exception {
+ mockBar.expectedBodiesReceived("Hello bar");
+ mockBeer.expectedBodiesReceived("Hello beer");
+
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+
+ mockBar.assertIsSatisfied();
+ mockBeer.assertIsSatisfied();
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ PahoComponent paho = context.getComponent("paho", PahoComponent.class);
+ paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort);
+
+ // route message dynamic using toD
+ from("direct:start").toD("paho:${header.where}");
+
+ from("paho:bar").to("mock:bar");
+ from("paho:beer").to("mock:beer");
+ }
+ };
+ }
+ }
+
+
+
+}