You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/02/08 08:32:29 UTC

[camel] branch rabbitmq created (now 85152d3c460)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch rabbitmq
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 85152d3c460 CAMEL-19008: rabbitmq - Add support for publisher confirms which allows to fail if sending to invalid destination.

This branch includes the following new commits:

     new 23b6c4768f0 rabbitmq confirm
     new 85152d3c460 CAMEL-19008: rabbitmq - Add support for publisher confirms which allows to fail if sending to invalid destination.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 02/02: CAMEL-19008: rabbitmq - Add support for publisher confirms which allows to fail if sending to invalid destination.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch rabbitmq
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 85152d3c460961df311cb77932428efde3a74d38
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Feb 8 09:32:12 2023 +0100

    CAMEL-19008: rabbitmq - Add support for publisher confirms which allows to fail if sending to invalid destination.
---
 .../SpringRabbitMQEndpointConfigurer.java          |  9 +++++++
 .../SpringRabbitMQEndpointUriFactory.java          |  4 +++-
 .../component/springrabbit/spring-rabbitmq.json    |  4 +++-
 .../springrabbit/SpringRabbitMQEndpoint.java       | 28 ++++++++++++++++++++--
 .../springrabbit/SpringRabbitMQProducer.java       | 25 +++++++++++--------
 .../integration/RabbitMQITSupport.java             | 12 +++++++---
 .../RabbitMQProducerInvalidExchangeIT.java         | 19 ++++++++-------
 7 files changed, 76 insertions(+), 25 deletions(-)

diff --git a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
index 71847e230f6..9b44a1eb362 100644
--- a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
+++ b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointConfigurer.java
@@ -34,6 +34,9 @@ public class SpringRabbitMQEndpointConfigurer extends PropertyConfigurerSupport
         case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
         case "concurrentconsumers":
         case "concurrentConsumers": target.setConcurrentConsumers(property(camelContext, java.lang.Integer.class, value)); return true;
+        case "confirm": target.setConfirm(property(camelContext, java.lang.String.class, value)); return true;
+        case "confirmtimeout":
+        case "confirmTimeout": target.setConfirmTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
         case "connectionfactory":
         case "connectionFactory": target.setConnectionFactory(property(camelContext, org.springframework.amqp.rabbit.connection.ConnectionFactory.class, value)); return true;
         case "deadletterexchange":
@@ -104,6 +107,9 @@ public class SpringRabbitMQEndpointConfigurer extends PropertyConfigurerSupport
         case "bridgeErrorHandler": return boolean.class;
         case "concurrentconsumers":
         case "concurrentConsumers": return java.lang.Integer.class;
+        case "confirm": return java.lang.String.class;
+        case "confirmtimeout":
+        case "confirmTimeout": return long.class;
         case "connectionfactory":
         case "connectionFactory": return org.springframework.amqp.rabbit.connection.ConnectionFactory.class;
         case "deadletterexchange":
@@ -175,6 +181,9 @@ public class SpringRabbitMQEndpointConfigurer extends PropertyConfigurerSupport
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "concurrentconsumers":
         case "concurrentConsumers": return target.getConcurrentConsumers();
+        case "confirm": return target.getConfirm();
+        case "confirmtimeout":
+        case "confirmTimeout": return target.getConfirmTimeout();
         case "connectionfactory":
         case "connectionFactory": return target.getConnectionFactory();
         case "deadletterexchange":
diff --git a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
index 59b7ddc3aa7..d3f7ebcade4 100644
--- a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
+++ b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class SpringRabbitMQEndpointUriFactory extends org.apache.camel.support.c
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(35);
+        Set<String> props = new HashSet<>(37);
         props.add("acknowledgeMode");
         props.add("args");
         props.add("asyncConsumer");
@@ -29,6 +29,8 @@ public class SpringRabbitMQEndpointUriFactory extends org.apache.camel.support.c
         props.add("autoStartup");
         props.add("bridgeErrorHandler");
         props.add("concurrentConsumers");
+        props.add("confirm");
+        props.add("confirmTimeout");
         props.add("connectionFactory");
         props.add("deadLetterExchange");
         props.add("deadLetterExchangeType");
diff --git a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
index 1dbd0d0ebe0..b11bbef4c2f 100644
--- a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
+++ b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json
@@ -85,7 +85,9 @@
     "messageListenerContainerType": { "kind": "parameter", "displayName": "Message Listener Container Type", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "DMLC", "SMLC" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "DMLC", "description": "The type of the MessageListenerContainer" },
     "prefetchCount": { "kind": "parameter", "displayName": "Prefetch Count", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Tell the broker how many messages to send in a single request. Often this can be set quite high to improve throughput." },
     "retry": { "kind": "parameter", "displayName": "Retry", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.springframework.retry.interceptor.RetryOperationsInterceptor", "deprecated": false, "autowired": false, "secret": false, "description": "Custom retry configuration to use. If this is configured then the other settings such as maximumRetryAttempts for retry are not in use." },
-    "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request\/reply messaging. The default value is 5 seconds. A negative value indicates an indefinite timeout." },
+    "confirm": { "kind": "parameter", "displayName": "Confirm", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "auto", "enabled", "disabled" ], "deprecated": false, "autowired": false, "secret": false, "description": "Controls whether to wait for confirms. The connection factory must be configured for publisher confirms and this method.auto = Camel detects if the connection factory uses confirms or not. disabled =  [...]
+    "confirmTimeout": { "kind": "parameter", "displayName": "Confirm Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Specify the timeout in milliseconds to be used when waiting for a message sent to be confirmed by RabbitMQ when doing send only messaging (InOnly). The default value is 5 seconds. A negative value indicates an inde [...]
+    "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request\/reply (InOut) messaging. The default value is 5 seconds. A negative value indicates an indefinite timeout." },
     "usePublisherConnection": { "kind": "parameter", "displayName": "Use Publisher Connection", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Use a separate connection for publishers and consumers" },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may other [...]
     "args": { "kind": "parameter", "displayName": "Args", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "arg.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.consumer. arg.exchange. arg.queue. arg.binding. arg.dlq.ex [...]
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
index 47082029075..1a9caa5107e 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
@@ -43,7 +43,6 @@ import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.core.QueueBuilder;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
 import org.springframework.amqp.support.converter.MessageConverter;
@@ -148,9 +147,18 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
                             + " message brokers and you want to route message from one system to another.")
     private boolean disableReplyTo;
     @UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "5000",
-              description = "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply messaging."
+              description = "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply (InOut) messaging."
                             + " The default value is 5 seconds. A negative value indicates an indefinite timeout.")
     private long replyTimeout = 5000;
+    @UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "5000",
+              description = "Specify the timeout in milliseconds to be used when waiting for a message sent to be confirmed by RabbitMQ when doing send only messaging (InOnly)."
+                            + " The default value is 5 seconds. A negative value indicates an indefinite timeout.")
+    private long confirmTimeout = 5000;
+    @UriParam(label = "producer", enums = "auto,enabled,disabled",
+              description = "Controls whether to wait for confirms. The connection factory must be configured for publisher confirms and this method."
+                            +
+                            "auto = Camel detects if the connection factory uses confirms or not. disabled = Confirms is disabled. enabled = Confirms is enabled.")
+    private String confirm = "auto";
     @UriParam(label = "producer", defaultValue = "false",
               description = "Use a separate connection for publishers and consumers")
     private boolean usePublisherConnection;
@@ -354,6 +362,22 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
         this.replyTimeout = replyTimeout;
     }
 
+    public long getConfirmTimeout() {
+        return confirmTimeout;
+    }
+
+    public void setConfirmTimeout(long confirmTimeout) {
+        this.confirmTimeout = confirmTimeout;
+    }
+
+    public String getConfirm() {
+        return confirm;
+    }
+
+    public void setConfirm(String confirm) {
+        this.confirm = confirm;
+    }
+
     public boolean isUsePublisherConnection() {
         return usePublisherConnection;
     }
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 0c1690df20e..dbd09d74d57 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -32,7 +32,6 @@ import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.RabbitMessageFuture;
 import org.springframework.amqp.rabbit.connection.Connection;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.connection.RabbitUtils;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
@@ -205,20 +204,26 @@ public class SpringRabbitMQProducer extends DefaultAsyncProducer {
 
         final String ex = exchangeName;
         final String rk = routingKey;
+        boolean confirm;
+        if ("auto".equalsIgnoreCase(getEndpoint().getConfirm())) {
+            confirm = getEndpoint().getConnectionFactory().isPublisherConfirms();
+        } else if ("enabled".equalsIgnoreCase(getEndpoint().getConfirm())) {
+            confirm = true;
+        } else {
+            confirm = false;
+        }
+        final long timeout = getEndpoint().getConfirmTimeout() <= 0 ? Long.MAX_VALUE : getEndpoint().getConfirmTimeout();
         try {
-            getInOnlyTemplate().setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
-                @Override
-                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-
-                }
-            });
-
             Boolean sent = getInOnlyTemplate().invoke(t -> {
                 t.send(ex, rk, msg);
-                return t.waitForConfirms(5000);
+                if (confirm) {
+                    return t.waitForConfirms(timeout);
+                } else {
+                    return true;
+                }
             });
             if (Boolean.FALSE == sent) {
-                exchange.setException(new TimeoutException("Message not sent within " + 5000 + " millis"));
+                exchange.setException(new TimeoutException("Message not sent within " + timeout + " millis"));
             }
         } catch (Exception e) {
             exchange.setException(e);
diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
index c7225057048..ff80deec70b 100644
--- a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
+++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
@@ -36,17 +36,23 @@ public abstract class RabbitMQITSupport extends CamelTestSupport {
 
     protected Logger log = LoggerFactory.getLogger(getClass());
 
-    ConnectionFactory createConnectionFactory() {
+    ConnectionFactory createConnectionFactory(boolean confirm) {
         CachingConnectionFactory cf = new CachingConnectionFactory();
-        cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
+        if (confirm) {
+            cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
+        }
         cf.setUri(service.getAmqpUrl());
         return cf;
     }
 
+    protected boolean confirmEnabled() {
+        return false;
+    }
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
-        context.getRegistry().bind("myCF", createConnectionFactory());
+        context.getRegistry().bind("myCF", createConnectionFactory(confirmEnabled()));
 
         SpringRabbitMQComponent rmq = context.getComponent("spring-rabbitmq", SpringRabbitMQComponent.class);
         // turn on auto declare
diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
index 5d1fc01bef8..694a5af2ec1 100644
--- a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
+++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
@@ -16,10 +16,9 @@
  */
 package org.apache.camel.component.springrabbit.integration;
 
-import org.apache.camel.CamelContext;
+import com.rabbitmq.client.ShutdownSignalException;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.springrabbit.SpringRabbitMQComponent;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.amqp.core.AmqpAdmin;
@@ -29,14 +28,13 @@ import org.springframework.amqp.core.TopicExchange;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 
+import static org.junit.Assert.fail;
+
 public class RabbitMQProducerInvalidExchangeIT extends RabbitMQITSupport {
 
     @Override
-    protected CamelContext createCamelContext() throws Exception {
-        CamelContext camelContext = super.createCamelContext();
-        SpringRabbitMQComponent rmq = camelContext.getComponent("spring-rabbitmq", SpringRabbitMQComponent.class);
-        rmq.setAllowNullBody(true);
-        return camelContext;
+    protected boolean confirmEnabled() {
+        return true;
     }
 
     @Test
@@ -51,7 +49,12 @@ public class RabbitMQProducerInvalidExchangeIT extends RabbitMQITSupport {
         admin.declareExchange(t);
         admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
 
-        Assertions.assertDoesNotThrow(() -> template.sendBody("direct:start", null));
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should fail");
+        } catch (Exception e) {
+            Assertions.assertInstanceOf(ShutdownSignalException.class, e.getCause());
+        }
     }
 
     @Override


[camel] 01/02: rabbitmq confirm

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch rabbitmq
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 23b6c4768f0f8f2a641acd5dfa1cd7e7744573d1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 5 08:03:49 2023 +0100

    rabbitmq confirm
---
 .../apache/camel/catalog/components/pubnub.json    |  4 +-
 .../springrabbit/SpringRabbitMQEndpoint.java       |  1 +
 .../springrabbit/SpringRabbitMQProducer.java       | 19 +++++-
 .../integration/RabbitMQITSupport.java             |  1 +
 .../RabbitMQProducerInvalidExchangeIT.java         | 67 ++++++++++++++++++++++
 .../endpoint/dsl/PubNubEndpointBuilderFactory.java |  3 +
 6 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json
index ab071bf687a..b2357991688 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json
@@ -23,7 +23,7 @@
   },
   "componentProperties": {
     "configuration": { "kind": "property", "displayName": "Configuration", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.pubnub.PubNubConfiguration", "deprecated": false, "autowired": false, "secret": false, "description": "The component configurations" },
-    "uuid": { "kind": "property", "displayName": "Uuid", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
+    "uuid": { "kind": "property", "displayName": "Uuid", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
     "withPresence": { "kind": "property", "displayName": "With Presence", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "Also subscribe to related presence information" },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...]
@@ -44,7 +44,7 @@
   },
   "properties": {
     "channel": { "kind": "path", "displayName": "Channel", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "The channel used for subscribing\/publishing events" },
-    "uuid": { "kind": "parameter", "displayName": "Uuid", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
+    "uuid": { "kind": "parameter", "displayName": "Uuid", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
     "withPresence": { "kind": "parameter", "displayName": "With Presence", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "Also subscribe to related presence information" },
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
index 84b5b49f1a8..47082029075 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
@@ -43,6 +43,7 @@ import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.core.QueueBuilder;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
 import org.springframework.amqp.support.converter.MessageConverter;
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 98e47629044..0c1690df20e 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.springrabbit;
 
 import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
@@ -31,6 +32,7 @@ import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.RabbitMessageFuture;
 import org.springframework.amqp.rabbit.connection.Connection;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.connection.RabbitUtils;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
@@ -201,8 +203,23 @@ public class SpringRabbitMQProducer extends DefaultAsyncProducer {
             msg = getEndpoint().getMessageConverter().toMessage(body, mp);
         }
 
+        final String ex = exchangeName;
+        final String rk = routingKey;
         try {
-            getInOnlyTemplate().send(exchangeName, routingKey, msg);
+            getInOnlyTemplate().setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
+                @Override
+                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+
+                }
+            });
+
+            Boolean sent = getInOnlyTemplate().invoke(t -> {
+                t.send(ex, rk, msg);
+                return t.waitForConfirms(5000);
+            });
+            if (Boolean.FALSE == sent) {
+                exchange.setException(new TimeoutException("Message not sent within " + 5000 + " millis"));
+            }
         } catch (Exception e) {
             exchange.setException(e);
         }
diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
index 5cc7526e1e0..c7225057048 100644
--- a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
+++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
@@ -38,6 +38,7 @@ public abstract class RabbitMQITSupport extends CamelTestSupport {
 
     ConnectionFactory createConnectionFactory() {
         CachingConnectionFactory cf = new CachingConnectionFactory();
+        cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
         cf.setUri(service.getAmqpUrl());
         return cf;
     }
diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
new file mode 100644
index 00000000000..5d1fc01bef8
--- /dev/null
+++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.springrabbit.integration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.springrabbit.SpringRabbitMQComponent;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+
+public class RabbitMQProducerInvalidExchangeIT extends RabbitMQITSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        SpringRabbitMQComponent rmq = camelContext.getComponent("spring-rabbitmq", SpringRabbitMQComponent.class);
+        rmq.setAllowNullBody(true);
+        return camelContext;
+    }
+
+    @Test
+    public void testProducer() {
+        ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
+
+        Queue q = new Queue("myqueue");
+        TopicExchange t = new TopicExchange("foo");
+
+        AmqpAdmin admin = new RabbitAdmin(cf);
+        admin.declareQueue(q);
+        admin.declareExchange(t);
+        admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
+
+        Assertions.assertDoesNotThrow(() -> template.sendBody("direct:start", null));
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .to("spring-rabbitmq:unknown?routingKey=foo.bar");
+            }
+        };
+    }
+}
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java
index f48698741f8..c77e3d5e75d 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java
@@ -50,6 +50,7 @@ public interface PubNubEndpointBuilderFactory {
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
+         * Required: true
          * Group: common
          * 
          * @param uuid the value to set
@@ -368,6 +369,7 @@ public interface PubNubEndpointBuilderFactory {
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
+         * Required: true
          * Group: common
          * 
          * @param uuid the value to set
@@ -615,6 +617,7 @@ public interface PubNubEndpointBuilderFactory {
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
+         * Required: true
          * Group: common
          * 
          * @param uuid the value to set