You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/02/08 08:32:30 UTC

[camel] 01/02: rabbitmq confirm

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch rabbitmq
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 23b6c4768f0f8f2a641acd5dfa1cd7e7744573d1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 5 08:03:49 2023 +0100

    rabbitmq confirm
---
 .../apache/camel/catalog/components/pubnub.json    |  4 +-
 .../springrabbit/SpringRabbitMQEndpoint.java       |  1 +
 .../springrabbit/SpringRabbitMQProducer.java       | 19 +++++-
 .../integration/RabbitMQITSupport.java             |  1 +
 .../RabbitMQProducerInvalidExchangeIT.java         | 67 ++++++++++++++++++++++
 .../endpoint/dsl/PubNubEndpointBuilderFactory.java |  3 +
 6 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json
index ab071bf687a..b2357991688 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pubnub.json
@@ -23,7 +23,7 @@
   },
   "componentProperties": {
     "configuration": { "kind": "property", "displayName": "Configuration", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.pubnub.PubNubConfiguration", "deprecated": false, "autowired": false, "secret": false, "description": "The component configurations" },
-    "uuid": { "kind": "property", "displayName": "Uuid", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
+    "uuid": { "kind": "property", "displayName": "Uuid", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
     "withPresence": { "kind": "property", "displayName": "With Presence", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "Also subscribe to related presence information" },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...]
@@ -44,7 +44,7 @@
   },
   "properties": {
     "channel": { "kind": "path", "displayName": "Channel", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "The channel used for subscribing\/publishing events" },
-    "uuid": { "kind": "parameter", "displayName": "Uuid", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
+    "uuid": { "kind": "parameter", "displayName": "Uuid", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "UUID to be used as a device identifier, a default UUID is generated if not passed." },
     "withPresence": { "kind": "parameter", "displayName": "With Presence", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pubnub.PubNubConfiguration", "configurationField": "configuration", "description": "Also subscribe to related presence information" },
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
index 84b5b49f1a8..47082029075 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java
@@ -43,6 +43,7 @@ import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.core.QueueBuilder;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
 import org.springframework.amqp.support.converter.MessageConverter;
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 98e47629044..0c1690df20e 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.springrabbit;
 
 import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
@@ -31,6 +32,7 @@ import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
 import org.springframework.amqp.rabbit.RabbitMessageFuture;
 import org.springframework.amqp.rabbit.connection.Connection;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.connection.RabbitUtils;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
@@ -201,8 +203,23 @@ public class SpringRabbitMQProducer extends DefaultAsyncProducer {
             msg = getEndpoint().getMessageConverter().toMessage(body, mp);
         }
 
+        final String ex = exchangeName;
+        final String rk = routingKey;
         try {
-            getInOnlyTemplate().send(exchangeName, routingKey, msg);
+            getInOnlyTemplate().setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
+                @Override
+                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+
+                }
+            });
+
+            Boolean sent = getInOnlyTemplate().invoke(t -> {
+                t.send(ex, rk, msg);
+                return t.waitForConfirms(5000);
+            });
+            if (Boolean.FALSE == sent) {
+                exchange.setException(new TimeoutException("Message not sent within " + 5000 + " millis"));
+            }
         } catch (Exception e) {
             exchange.setException(e);
         }
diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
index 5cc7526e1e0..c7225057048 100644
--- a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
+++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQITSupport.java
@@ -38,6 +38,7 @@ public abstract class RabbitMQITSupport extends CamelTestSupport {
 
     ConnectionFactory createConnectionFactory() {
         CachingConnectionFactory cf = new CachingConnectionFactory();
+        cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
         cf.setUri(service.getAmqpUrl());
         return cf;
     }
diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
new file mode 100644
index 00000000000..5d1fc01bef8
--- /dev/null
+++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerInvalidExchangeIT.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.springrabbit.integration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.springrabbit.SpringRabbitMQComponent;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+
+public class RabbitMQProducerInvalidExchangeIT extends RabbitMQITSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        SpringRabbitMQComponent rmq = camelContext.getComponent("spring-rabbitmq", SpringRabbitMQComponent.class);
+        rmq.setAllowNullBody(true);
+        return camelContext;
+    }
+
+    @Test
+    public void testProducer() {
+        ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
+
+        Queue q = new Queue("myqueue");
+        TopicExchange t = new TopicExchange("foo");
+
+        AmqpAdmin admin = new RabbitAdmin(cf);
+        admin.declareQueue(q);
+        admin.declareExchange(t);
+        admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
+
+        Assertions.assertDoesNotThrow(() -> template.sendBody("direct:start", null));
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .to("spring-rabbitmq:unknown?routingKey=foo.bar");
+            }
+        };
+    }
+}
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java
index f48698741f8..c77e3d5e75d 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PubNubEndpointBuilderFactory.java
@@ -50,6 +50,7 @@ public interface PubNubEndpointBuilderFactory {
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
+         * Required: true
          * Group: common
          * 
          * @param uuid the value to set
@@ -368,6 +369,7 @@ public interface PubNubEndpointBuilderFactory {
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
+         * Required: true
          * Group: common
          * 
          * @param uuid the value to set
@@ -615,6 +617,7 @@ public interface PubNubEndpointBuilderFactory {
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
+         * Required: true
          * Group: common
          * 
          * @param uuid the value to set