You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/29 07:32:54 UTC
[camel] 01/02: CAMEL-19210 - Added in the option for setting the deliverAt() on a (#9660)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6121f8b845a1e70d28cc98b22b27ae6efbd1175a
Author: MrBlueMoo <al...@bluestar-software.co.uk>
AuthorDate: Wed Mar 29 08:29:37 2023 +0100
CAMEL-19210 - Added in the option for setting the deliverAt() on a (#9660)
message used when sending Apache Pulsar messages to allow for delayed
delivery. Uses the CamelPulsarProducerMessageDeliverAt header name for
setting.
---
.../resources/org/apache/camel/component/pulsar/pulsar.json | 1 +
.../java/org/apache/camel/component/pulsar/PulsarProducer.java | 5 +++++
.../component/pulsar/utils/message/PulsarMessageHeaders.java | 2 ++
.../pulsar/integration/PulsarProducerHeadersInIT.java | 10 ++++++++++
4 files changed, 18 insertions(+)
diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index 62756df64aa..498b8abe424 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -81,6 +81,7 @@
"CamelPulsarProducerMessageKey": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The key of the message for routing policy.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#KEY_OUT" },
"CamelPulsarProducerMessageProperties": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The properties of the message to add.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PROPERTIES_OUT" },
"CamelPulsarProducerMessageEventTime": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The event time of the message message.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#EVENT_TIME_OUT" },
+ "CamelPulsarProducerMessageDeliverAt": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time to deliver the message at.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#DELIVER_AT_OUT" },
"CamelPulsarRedeliveryCount": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message redelivery count, redelivery count maintain in pulsar broker.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PULSAR_REDELIVERY_COUNT" }
},
"properties": {
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index bc28f747efc..5637837ba7c 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -75,6 +75,11 @@ public class PulsarProducer extends DefaultAsyncProducer {
messageBuilder.eventTime(eventTime);
}
+ Long deliverAt = exchange.getIn().getHeader(PulsarMessageHeaders.DELIVER_AT_OUT, Long.class);
+ if (deliverAt != null) {
+ messageBuilder.deliverAt(deliverAt);
+ }
+
messageBuilder.sendAsync()
.thenAccept(r -> exchange.getIn().setBody(r))
.whenComplete(
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
index 204fd8e1c65..0ba85f00941 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
@@ -48,6 +48,8 @@ public interface PulsarMessageHeaders {
String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties";
@Metadata(label = "producer", description = "The event time of the message message.", javaType = "Long")
String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime";
+ @Metadata(label = "producer", description = "The time to deliver the message at.", javaType = "Long")
+ String DELIVER_AT_OUT = "CamelPulsarProducerMessageDeliverAt";
@Metadata(label = "consumer", description = "The message redelivery count, redelivery count maintain in pulsar broker.",
javaType = "int")
String PULSAR_REDELIVERY_COUNT = "CamelPulsarRedeliveryCount";
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java
index 33231f1da2c..1046b1d5ef7 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerHeadersInIT.java
@@ -116,6 +116,16 @@ public class PulsarProducerHeadersInIT extends PulsarITSupport {
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
}
+ @Test
+ public void deliverAtHeaderSetsPulsarDeliverAt() throws InterruptedException {
+ long deliverAt = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1);
+ mock.expectedMinimumMessageCount(1);
+
+ producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.DELIVER_AT_OUT, deliverAt);
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+ }
+
@Test
public void keyHeaderSetsPulsarKey() throws InterruptedException {
String key = "testKey";