You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/08/13 08:33:29 UTC

[camel] branch main updated: CAMEL-18376: camel-pulsar: add redelivery backoff for ack timeout, nack (#8147)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e1bdb2955f CAMEL-18376: camel-pulsar: add redelivery backoff for ack timeout, nack (#8147)
6e1bdb2955f is described below

commit 6e1bdb2955fb6de483d5ef95042ac33377865600
Author: Anthony Sam Wu <55...@users.noreply.github.com>
AuthorDate: Sat Aug 13 04:33:22 2022 -0400

    CAMEL-18376: camel-pulsar: add redelivery backoff for ack timeout, nack (#8147)
    
    * CAMEL-18376: camel-pulsar; add redelivery backoff for ack timeout, negative ack
    
    * Fix import order/linebreak
    
    * Apply formatting
    
    * Provide ack timeout in negative ack test
    
    * Rebind in test
    
    * Set backoff not in endpoint query param
    
    * Fix import
    
    * Fix tests
---
 .../pulsar/PulsarComponentConfigurer.java          | 12 +++++++
 .../component/pulsar/PulsarEndpointConfigurer.java | 12 +++++++
 .../component/pulsar/PulsarEndpointUriFactory.java |  4 ++-
 .../org/apache/camel/component/pulsar/pulsar.json  |  4 +++
 .../component/pulsar/PulsarConfiguration.java      | 27 +++++++++++++++
 .../consumers/CommonCreationStrategyImpl.java      | 10 ++++++
 .../PulsarConsumerDeadLetterPolicyIT.java          |  6 ++--
 ...> PulsarConsumerNegativeAcknowledgementIT.java} | 38 +++++++++++++++-------
 .../PulsarConsumerNoAcknowledgementIT.java         | 29 +++++++++++++++--
 9 files changed, 124 insertions(+), 18 deletions(-)

diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
index 86afa21c75a..97dab737df9 100644
--- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
+++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
@@ -32,6 +32,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme
         case "ackGroupTimeMillis": getOrCreateConfiguration(target).setAckGroupTimeMillis(property(camelContext, long.class, value)); return true;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": getOrCreateConfiguration(target).setAckTimeoutMillis(property(camelContext, long.class, value)); return true;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": getOrCreateConfiguration(target).setAckTimeoutRedeliveryBackoff(property(camelContext, org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": getOrCreateConfiguration(target).setAllowManualAcknowledgement(property(camelContext, boolean.class, value)); return true;
         case "authenticationclass":
@@ -81,6 +83,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme
         case "messageRouter": getOrCreateConfiguration(target).setMessageRouter(property(camelContext, org.apache.pulsar.client.api.MessageRouter.class, value)); return true;
         case "messageroutingmode":
         case "messageRoutingMode": getOrCreateConfiguration(target).setMessageRoutingMode(property(camelContext, org.apache.pulsar.client.api.MessageRoutingMode.class, value)); return true;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": getOrCreateConfiguration(target).setNegativeAckRedeliveryBackoff(property(camelContext, org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": getOrCreateConfiguration(target).setNegativeAckRedeliveryDelayMicros(property(camelContext, long.class, value)); return true;
         case "numberofconsumerthreads":
@@ -125,6 +129,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme
         case "ackGroupTimeMillis": return long.class;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return long.class;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return boolean.class;
         case "authenticationclass":
@@ -174,6 +180,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme
         case "messageRouter": return org.apache.pulsar.client.api.MessageRouter.class;
         case "messageroutingmode":
         case "messageRoutingMode": return org.apache.pulsar.client.api.MessageRoutingMode.class;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return long.class;
         case "numberofconsumerthreads":
@@ -214,6 +222,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme
         case "ackGroupTimeMillis": return getOrCreateConfiguration(target).getAckGroupTimeMillis();
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return getOrCreateConfiguration(target).getAckTimeoutMillis();
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return getOrCreateConfiguration(target).getAckTimeoutRedeliveryBackoff();
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return getOrCreateConfiguration(target).isAllowManualAcknowledgement();
         case "authenticationclass":
@@ -263,6 +273,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme
         case "messageRouter": return getOrCreateConfiguration(target).getMessageRouter();
         case "messageroutingmode":
         case "messageRoutingMode": return getOrCreateConfiguration(target).getMessageRoutingMode();
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return getOrCreateConfiguration(target).getNegativeAckRedeliveryBackoff();
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return getOrCreateConfiguration(target).getNegativeAckRedeliveryDelayMicros();
         case "numberofconsumerthreads":
diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index f19746e95f0..08be79b9ea5 100644
--- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -25,6 +25,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "ackGroupTimeMillis": target.getPulsarConfiguration().setAckGroupTimeMillis(property(camelContext, long.class, value)); return true;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": target.getPulsarConfiguration().setAckTimeoutMillis(property(camelContext, long.class, value)); return true;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": target.getPulsarConfiguration().setAckTimeoutRedeliveryBackoff(property(camelContext, org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": target.getPulsarConfiguration().setAllowManualAcknowledgement(property(camelContext, boolean.class, value)); return true;
         case "authenticationclass":
@@ -73,6 +75,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "messageRouter": target.getPulsarConfiguration().setMessageRouter(property(camelContext, org.apache.pulsar.client.api.MessageRouter.class, value)); return true;
         case "messageroutingmode":
         case "messageRoutingMode": target.getPulsarConfiguration().setMessageRoutingMode(property(camelContext, org.apache.pulsar.client.api.MessageRoutingMode.class, value)); return true;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": target.getPulsarConfiguration().setNegativeAckRedeliveryBackoff(property(camelContext, org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext, long.class, value)); return true;
         case "numberofconsumerthreads":
@@ -108,6 +112,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "ackGroupTimeMillis": return long.class;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return long.class;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return boolean.class;
         case "authenticationclass":
@@ -156,6 +162,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "messageRouter": return org.apache.pulsar.client.api.MessageRouter.class;
         case "messageroutingmode":
         case "messageRoutingMode": return org.apache.pulsar.client.api.MessageRoutingMode.class;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return long.class;
         case "numberofconsumerthreads":
@@ -192,6 +200,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "ackGroupTimeMillis": return target.getPulsarConfiguration().getAckGroupTimeMillis();
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return target.getPulsarConfiguration().getAckTimeoutMillis();
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return target.getPulsarConfiguration().getAckTimeoutRedeliveryBackoff();
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return target.getPulsarConfiguration().isAllowManualAcknowledgement();
         case "authenticationclass":
@@ -240,6 +250,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "messageRouter": return target.getPulsarConfiguration().getMessageRouter();
         case "messageroutingmode":
         case "messageRoutingMode": return target.getPulsarConfiguration().getMessageRoutingMode();
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return target.getPulsarConfiguration().getNegativeAckRedeliveryBackoff();
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return target.getPulsarConfiguration().getNegativeAckRedeliveryDelayMicros();
         case "numberofconsumerthreads":
diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
index 87e2015e5be..3bb04d4d8c6 100644
--- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
+++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
@@ -21,9 +21,10 @@ public class PulsarEndpointUriFactory extends org.apache.camel.support.component
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(42);
+        Set<String> props = new HashSet<>(44);
         props.add("ackGroupTimeMillis");
         props.add("ackTimeoutMillis");
+        props.add("ackTimeoutRedeliveryBackoff");
         props.add("allowManualAcknowledgement");
         props.add("authenticationClass");
         props.add("authenticationParams");
@@ -49,6 +50,7 @@ public class PulsarEndpointUriFactory extends org.apache.camel.support.component
         props.add("messageRouter");
         props.add("messageRoutingMode");
         props.add("namespace");
+        props.add("negativeAckRedeliveryBackoff");
         props.add("negativeAckRedeliveryDelayMicros");
         props.add("numberOfConsumerThreads");
         props.add("numberOfConsumers");
diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index c28d2bb37a1..d2302601763 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -28,6 +28,7 @@
     "serviceUrl": { "kind": "property", "displayName": "Service Url", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The Pulsar Service URL to point while creating the client from URI" },
     "ackGroupTimeMillis": { "kind": "property", "displayName": "Ack Group Time Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100" },
     "ackTimeoutMillis": { "kind": "property", "displayName": "Ack Timeout Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Timeout for unacknowledged messages in milliseconds - defaults to 10000" },
+    "ackTimeoutRedeliveryBackoff": { "kind": "property", "displayName": "Ack Timeout Redelivery Backoff", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "RedeliveryBackoff to use for ack timeout redelivery b [...]
     "allowManualAcknowledgement": { "kind": "property", "displayName": "Allow Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Whether to allow manual message acknowledgements. If this option is ena [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
     "consumerName": { "kind": "property", "displayName": "Consumer Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sole-consumer", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the consumer when subscription is EXCLUSIVE" },
@@ -36,6 +37,7 @@
     "deadLetterTopic": { "kind": "property", "displayName": "Dead Letter Topic", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, defa [...]
     "maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. [...]
     "messageListener": { "kind": "property", "displayName": "Message Listener", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Whether to use the messageListener interface, or to receive messages using a separate thread pool" },
+    "negativeAckRedeliveryBackoff": { "kind": "property", "displayName": "Negative Ack Redelivery Backoff", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "RedeliveryBackoff to use for negative ack redeliver [...]
     "negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Set the negative acknowledgement delay" },
     "numberOfConsumers": { "kind": "property", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Number of consumers - defaults to 1" },
     "numberOfConsumerThreads": { "kind": "property", "displayName": "Number Of Consumer Threads", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Number of threads to receive and handle messages when using a separate thread pool" },
@@ -90,6 +92,7 @@
     "serviceUrl": { "kind": "parameter", "displayName": "Service Url", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The Pulsar Service URL to point while creating the client from URI" },
     "ackGroupTimeMillis": { "kind": "parameter", "displayName": "Ack Group Time Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Group the consumer acknowledgments for the specified time in milliseconds - defaults [...]
     "ackTimeoutMillis": { "kind": "parameter", "displayName": "Ack Timeout Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Timeout for unacknowledged messages in milliseconds - defaults to 10000" },
+    "ackTimeoutRedeliveryBackoff": { "kind": "parameter", "displayName": "Ack Timeout Redelivery Backoff", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "RedeliveryBackoff to use for ack timeout redel [...]
     "allowManualAcknowledgement": { "kind": "parameter", "displayName": "Allow Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Whether to allow manual message acknowledgements. If this option [...]
     "consumerName": { "kind": "parameter", "displayName": "Consumer Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sole-consumer", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the consumer when subscription is EXCLUSIVE" },
     "consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "cons", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Prefix to add to consumer names when a SHARED or FAILOVER subscription  [...]
@@ -97,6 +100,7 @@
     "deadLetterTopic": { "kind": "parameter", "displayName": "Dead Letter Topic", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not se [...]
     "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Maximum number of times that a message will be redelivered before being sent to the dead letter [...]
     "messageListener": { "kind": "parameter", "displayName": "Message Listener", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Whether to use the messageListener interface, or to receive messages using a separate th [...]
+    "negativeAckRedeliveryBackoff": { "kind": "parameter", "displayName": "Negative Ack Redelivery Backoff", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "RedeliveryBackoff to use for negative ack re [...]
     "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Set the negative acknowledgement delay" },
     "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
     "numberOfConsumerThreads": { "kind": "parameter", "displayName": "Number Of Consumer Threads", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Number of threads to receive and handle messages when using a separate thread [...]
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 5f22892bc5c..1d9aa490e71 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.LATEST;
@@ -65,6 +66,10 @@ public class PulsarConfiguration implements Cloneable {
     private long ackTimeoutMillis = 10000;
     @UriParam(label = "consumer", defaultValue = "60000000")
     private long negativeAckRedeliveryDelayMicros = 60000000;
+    @UriParam(label = "consumer", description = "RedeliveryBackoff to use for ack timeout redelivery backoff.")
+    private RedeliveryBackoff ackTimeoutRedeliveryBackoff;
+    @UriParam(label = "consumer", description = "RedeliveryBackoff to use for negative ack redelivery backoff.")
+    private RedeliveryBackoff negativeAckRedeliveryBackoff;
     @UriParam(label = "consumer", defaultValue = "100")
     private long ackGroupTimeMillis = 100;
     @UriParam(label = "consumer", defaultValue = "LATEST")
@@ -455,6 +460,28 @@ public class PulsarConfiguration implements Cloneable {
         this.negativeAckRedeliveryDelayMicros = negativeAckRedeliveryDelayMicros;
     }
 
+    public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
+        return ackTimeoutRedeliveryBackoff;
+    }
+
+    /**
+     * Set a RedeliveryBackoff to use for ack timeout redelivery backoff.
+     */
+    public void setAckTimeoutRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff) {
+        this.ackTimeoutRedeliveryBackoff = redeliveryBackoff;
+    }
+
+    public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
+        return negativeAckRedeliveryBackoff;
+    }
+
+    /**
+     * Set a RedeliveryBackoff to use for negative ack redelivery backoff.
+     */
+    public void setNegativeAckRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff) {
+        this.negativeAckRedeliveryBackoff = redeliveryBackoff;
+    }
+
     public Integer getMaxRedeliverCount() {
         return maxRedeliverCount;
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index 2cdfa62625b..32446880a28 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -22,6 +22,7 @@ import org.apache.camel.component.pulsar.PulsarConfiguration;
 import org.apache.camel.component.pulsar.PulsarConsumer;
 import org.apache.camel.component.pulsar.PulsarEndpoint;
 import org.apache.camel.component.pulsar.PulsarMessageListener;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder;
@@ -66,6 +67,15 @@ public final class CommonCreationStrategyImpl {
 
             builder.deadLetterPolicy(policy.build());
         }
+
+        if (ObjectHelper.isNotEmpty(endpointConfiguration.getAckTimeoutRedeliveryBackoff())) {
+            builder.ackTimeoutRedeliveryBackoff(endpointConfiguration.getAckTimeoutRedeliveryBackoff());
+        }
+
+        if (ObjectHelper.isNotEmpty(endpointConfiguration.getNegativeAckRedeliveryBackoff())) {
+            builder.negativeAckRedeliveryBackoff(endpointConfiguration.getNegativeAckRedeliveryBackoff());
+        }
+
         return builder;
     }
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
index 2d45cb44503..8b2223a5796 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
@@ -111,7 +111,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends PulsarITSupport {
     }
 
     @Test
-    public void givenMaxRedeliverCountverifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
+    public void givenMaxRedeliverCountVerifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
             throws Exception {
         PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
 
@@ -136,7 +136,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends PulsarITSupport {
     }
 
     @Test
-    public void givenMaxRedeliverCountAndDeadLetterTopicverifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
+    public void givenMaxRedeliverCountAndDeadLetterTopicVerifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
             throws Exception {
         PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
 
@@ -162,7 +162,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends PulsarITSupport {
     }
 
     @Test
-    public void givenOnlyDeadLetterTopicverifyMessageDoesNotGetSentToSpecifiedTopic() throws Exception {
+    public void givenOnlyDeadLetterTopicVerifyMessageDoesNotGetSentToSpecifiedTopic() throws Exception {
         PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);
 
         PulsarEndpoint from = (PulsarEndpoint) component
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
similarity index 67%
copy from components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
copy to components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
index 026e0d2d41e..f5bb919fe0b 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
@@ -23,7 +23,9 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.pulsar.PulsarComponent;
+import org.apache.camel.component.pulsar.PulsarMessageReceipt;
 import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.support.SimpleRegistry;
 import org.apache.pulsar.client.api.Producer;
@@ -31,16 +33,16 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
 import org.junit.jupiter.api.Test;
 
-public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
+public class PulsarConsumerNegativeAcknowledgementIT extends PulsarITSupport {
 
-    private static final String TOPIC_URI = "persistent://public/default/camel-topic";
+    private static final String TOPIC_URI = "persistent://public/default/camel-topic-negative-ack";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&ackTimeoutMillis=1000")
+    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive&batchingEnabled=false"
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -51,8 +53,12 @@ public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                // Nothing in the route will ack the message.
-                from(from).to(to);
+                // This route will explicitly negative acknowledge the message.
+                from(from)
+                        .process(exchange -> exchange.getIn()
+                                .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, PulsarMessageReceipt.class)
+                                .negativeAcknowledge())
+                        .to(to);
             }
         };
     }
@@ -74,7 +80,16 @@ public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
-        comp.getConfiguration().setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+        comp.getConfiguration()
+                .setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+        comp.getConfiguration().setAckTimeoutMillis(60_000L);
+        // Given relevant millis=1000 redeliveries will occur at 1s + 0.01s, 1s + 1s, 1s + 100s, 1s + 100s, 1s + 100s...
+        comp.getConfiguration().setNegativeAckRedeliveryDelayMicros(1_000_000L);
+        comp.getConfiguration().setNegativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
+                .minDelayMs(10L)
+                .maxDelayMs(100_000L)
+                .multiplier(100.0)
+                .build());
         registry.bind("pulsar", comp);
     }
 
@@ -83,8 +98,8 @@ public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
     }
 
     @Test
-    public void testAMessageIsConsumedMultipleTimes() throws Exception {
-        to.expectedMinimumMessageCount(2);
+    public void testAMessageIsConsumedMultipleTimesWithNegativeAckBackoff() throws Exception {
+        to.expectedMessageCount(3);
 
         Producer<String> producer
                 = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
@@ -92,6 +107,7 @@ public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
         producer.send("Hello World!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
-    }
 
+        producer.close();
+    }
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
index 026e0d2d41e..aed3a199f77 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
 import org.junit.jupiter.api.Test;
 
 public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
@@ -39,8 +40,7 @@ public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
     private static final String PRODUCER = "camel-producer-1";
 
     @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&ackTimeoutMillis=1000")
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -74,7 +74,15 @@ public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
-        comp.getConfiguration().setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+        comp.getConfiguration()
+                .setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+        // Given relevant millis=1000 redeliveries will occur at 1s + 0.01s, 1s + 1s, 1s + 100s, 1s + 100s, 1s + 100s...
+        comp.getConfiguration().setAckTimeoutMillis(1_000L);
+        comp.getConfiguration().setAckTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
+                .minDelayMs(10L)
+                .maxDelayMs(100_000L)
+                .multiplier(100.0)
+                .build());
         registry.bind("pulsar", comp);
     }
 
@@ -92,6 +100,21 @@ public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
         producer.send("Hello World!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+
+        producer.close();
     }
 
+    @Test
+    public void testAMessageIsConsumedMultipleTimesWithAckTimeoutBackoff() throws Exception {
+        to.expectedMessageCount(3);
+
+        Producer<String> producer
+                = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+
+        producer.close();
+    }
 }