You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/29 07:32:54 UTC

[camel] 01/02: CAMEL-19210 - Added in the option for setting the deliverAt() on a (#9660)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6121f8b845a1e70d28cc98b22b27ae6efbd1175a
Author: MrBlueMoo <al...@bluestar-software.co.uk>
AuthorDate: Wed Mar 29 08:29:37 2023 +0100

    CAMEL-19210 - Added in the option for setting the deliverAt() on a (#9660)
    
    message used when sending Apache Pulsar messages to allow for delayed
    delivery. Uses the CamelPulsarProducerMessageDeliverAt header name for
    setting.
---
 .../resources/org/apache/camel/component/pulsar/pulsar.json    |  1 +
 .../java/org/apache/camel/component/pulsar/PulsarProducer.java |  5 +++++
 .../component/pulsar/utils/message/PulsarMessageHeaders.java   |  2 ++
 .../pulsar/integration/PulsarProducerHeadersInIT.java          | 10 ++++++++++
 4 files changed, 18 insertions(+)

diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index 62756df64aa..498b8abe424 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -81,6 +81,7 @@
     "CamelPulsarProducerMessageKey": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The key of the message for routing policy.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#KEY_OUT" },
     "CamelPulsarProducerMessageProperties": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The properties of the message to add.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PROPERTIES_OUT" },
     "CamelPulsarProducerMessageEventTime": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The event time of the message message.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#EVENT_TIME_OUT" },
+    "CamelPulsarProducerMessageDeliverAt": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time to deliver the message at.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#DELIVER_AT_OUT" },
     "CamelPulsarRedeliveryCount": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message redelivery count, redelivery count maintain in pulsar broker.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PULSAR_REDELIVERY_COUNT" }
   },
   "properties": {
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index bc28f747efc..5637837ba7c 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -75,6 +75,11 @@ public class PulsarProducer extends DefaultAsyncProducer {
                 messageBuilder.eventTime(eventTime);
             }
 
+            Long deliverAt = exchange.getIn().getHeader(PulsarMessageHeaders.DELIVER_AT_OUT, Long.class);
+            if (deliverAt != null) {
+                messageBuilder.deliverAt(deliverAt);
+            }
+
             messageBuilder.sendAsync()
                     .thenAccept(r -> exchange.getIn().setBody(r))
                     .whenComplete(
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
index 204fd8e1c65..0ba85f00941 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
@@ -48,6 +48,8 @@ public interface PulsarMessageHeaders {
     String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties";
     @Metadata(label = "producer", description = "The event time of the message message.", javaType = "Long")
     String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime";
+    @Metadata(label = "producer", description = "The time to deliver the message at.", javaType = "Long")
+    String DELIVER_AT_OUT = "CamelPulsarProducerMessageDeliverAt";
     @Metadata(label = "consumer", description = "The message redelivery count, redelivery count maintain in pulsar broker.",
               javaType = "int")
     String PULSAR_REDELIVERY_COUNT = "CamelPulsarRedeliveryCount";
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java
index 33231f1da2c..1046b1d5ef7 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java
@@ -116,6 +116,16 @@ public class PulsarProducerHeadersInIT extends PulsarITSupport {
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
     }
 
+    @Test
+    public void deliverAtHeaderSetsPulsarDeliverAt() throws InterruptedException {
+        long deliverAt = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1);
+        mock.expectedMinimumMessageCount(1);
+
+        producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.DELIVER_AT_OUT, deliverAt);
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+    }
+
     @Test
     public void keyHeaderSetsPulsarKey() throws InterruptedException {
         String key = "testKey";