You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/13 04:23:29 UTC

[camel] 01/02: CAMEL-13841: Add the ability to do manual Pulsar message acknowledgements.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fabfa0f22cc7f55182ff9fc43ae9e2d5ddd58eaf
Author: Masa Horiyama <ma...@toasttab.com>
AuthorDate: Fri Aug 9 00:23:36 2019 -0400

    CAMEL-13841: Add the ability to do manual Pulsar message acknowledgements.
---
 .../src/main/docs/pulsar-component.adoc            |   9 +-
 .../pulsar/DefaultPulsarMessageReceipt.java        |  67 +++++++
 ...ava => DefaultPulsarMessageReceiptFactory.java} |  22 +--
 .../camel/component/pulsar/PulsarComponent.java    |  33 ++++
 .../camel/component/pulsar/PulsarEndpoint.java     |   5 +
 .../component/pulsar/PulsarMessageListener.java    |  11 +-
 ...ssageHeaders.java => PulsarMessageReceipt.java} |  31 ++--
 ...aders.java => PulsarMessageReceiptFactory.java} |  19 +-
 .../pulsar/configuration/PulsarConfiguration.java  |  43 +++++
 .../consumers/CommonCreationStrategyImpl.java      |   3 +
 .../pulsar/utils/message/PulsarMessageHeaders.java |   1 +
 .../component/pulsar/PulsarComponentTest.java      |  14 ++
 .../pulsar/PulsarConsumerAcknowledgementTest.java  | 200 +++++++++++++++++++++
 .../PulsarConsumerNoAcknowledgementTest.java       | 103 +++++++++++
 .../pulsar/PulsarCustomMessageReceiptTest.java     | 135 ++++++++++++++
 .../pulsar/PulsarNegativeAcknowledgementTest.java} |  25 +--
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java |  86 +++++++++
 .../springboot/PulsarComponentConfiguration.java   |  32 ++++
 18 files changed, 789 insertions(+), 50 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index e6d9ec5..f157413 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -28,7 +28,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
 
 
 // component options: START
-The Apache Pulsar component supports 4 options, which are listed below.
+The Apache Pulsar component supports 6 options, which are listed below.
 
 
 
@@ -37,6 +37,8 @@ The Apache Pulsar component supports 4 options, which are listed below.
 | Name | Description | Default | Type
 | *autoConfiguration* (advanced) | The pulsar auto configuration |  | AutoConfiguration
 | *pulsarClient* (advanced) | The pulsar client |  | PulsarClient
+| *allowManual Acknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
+| *pulsarMessageReceipt Factory* (consumer) | Provide a factory to create an alternate implementation of PulsarMessageReceipt. |  | PulsarMessageReceipt Factory
 | *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
 | *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 |===
@@ -68,12 +70,15 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (13 parameters):
+=== Query Parameters (16 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *ackGroupTimeMillis* (consumer) | Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 | 100 | long
+| *ackTimeoutMillis* (consumer) | Timeout for unacknowledged messages in milliseconds - defaults to 10000 | 10000 | long
+| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerName* (consumer) | Name of the consumer when subscription is EXCLUSIVE | sole-consumer | String
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
new file mode 100644
index 0000000..f1322fc
--- /dev/null
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class DefaultPulsarMessageReceipt implements PulsarMessageReceipt {
+
+    private final Consumer consumer;
+
+    private final MessageId messageId;
+
+    public DefaultPulsarMessageReceipt(Consumer consumer, MessageId messageId) {
+        this.consumer = consumer;
+        this.messageId = messageId;
+    }
+
+    @Override
+    public void acknowledge() throws PulsarClientException {
+        consumer.acknowledge(messageId);
+    }
+
+    @Override
+    public void acknowledgeCumulative() throws PulsarClientException {
+        consumer.acknowledgeCumulative(messageId);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync() {
+        return consumer.acknowledgeAsync(messageId);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync() {
+        return consumer.acknowledgeCumulativeAsync(messageId);
+    }
+
+    @Override
+    public void negativeAcknowledge() {
+        throw new UnsupportedOperationException("Negative acknowledge is not supported in this version of the Pulsar client.");
+    }
+
+    public Consumer getConsumer() {
+        return consumer;
+    }
+
+    public MessageId getMessageId() {
+        return messageId;
+    }
+}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceiptFactory.java
similarity index 63%
copy from components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
copy to components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceiptFactory.java
index f8b3267..96e0d39 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceiptFactory.java
@@ -14,17 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.pulsar.utils.message;
+package org.apache.camel.component.pulsar;
 
-public interface PulsarMessageHeaders {
+import org.apache.camel.Exchange;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+
+public class DefaultPulsarMessageReceiptFactory implements PulsarMessageReceiptFactory {
+
+    @Override
+    public PulsarMessageReceipt newInstance(Exchange exchange, Message message, Consumer consumer) {
+        return new DefaultPulsarMessageReceipt(consumer, message.getMessageId());
+    }
 
-    String PROPERTIES = "properties";
-    String PRODUCER_NAME = "producer_name";
-    String SEQUENCE_ID = "sequence_id";
-    String PUBLISH_TIME = "publish_time";
-    String MESSAGE_ID = "message_id";
-    String EVENT_TIME = "event_time";
-    String KEY = "key";
-    String KEY_BYTES = "key_bytes";
-    String TOPIC_NAME = "topic_name";
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
index 67f34ff..b43ab1d 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
@@ -35,6 +35,10 @@ public class PulsarComponent extends DefaultComponent {
     private AutoConfiguration autoConfiguration;
     @Metadata(label = "advanced")
     private PulsarClient pulsarClient;
+    @Metadata(label = "consumer", defaultValue = "false")
+    private boolean allowManualAcknowledgement;
+    @Metadata(label = "consumer,advanced")
+    private PulsarMessageReceiptFactory pulsarMessageReceiptFactory = new DefaultPulsarMessageReceiptFactory();
 
     public PulsarComponent() {
     }
@@ -46,6 +50,9 @@ public class PulsarComponent extends DefaultComponent {
     @Override
     protected Endpoint createEndpoint(final String uri, final String path, final Map<String, Object> parameters) throws Exception {
         final PulsarConfiguration configuration = new PulsarConfiguration();
+
+        configuration.setAllowManualAcknowledgement(isAllowManualAcknowledgement());
+
         setProperties(configuration, parameters);
         if (autoConfiguration != null) {
             setProperties(autoConfiguration, parameters);
@@ -93,4 +100,30 @@ public class PulsarComponent extends DefaultComponent {
     public void setPulsarClient(PulsarClient pulsarClient) {
         this.pulsarClient = pulsarClient;
     }
+
+    public boolean isAllowManualAcknowledgement() {
+        return allowManualAcknowledgement;
+    }
+
+    /**
+     * Whether to allow manual message acknowledgements.
+     * <p/>
+     * If this option is enabled, then messages are not immediately acknowledged after being consumed.
+     * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
+     * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+     */
+    public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
+        this.allowManualAcknowledgement = allowManualAcknowledgement;
+    }
+
+    public PulsarMessageReceiptFactory getPulsarMessageReceiptFactory() {
+        return pulsarMessageReceiptFactory;
+    }
+
+    /**
+     * Provide a factory to create an alternate implementation of {@link PulsarMessageReceipt}.
+     */
+    public void setPulsarMessageReceiptFactory(PulsarMessageReceiptFactory pulsarMessageReceiptFactory) {
+        this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory;
+    }
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
index 917bcf8..0330677 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
@@ -137,4 +137,9 @@ public class PulsarEndpoint extends DefaultEndpoint {
 
         uri = persistence + "://" + tenant + "/" + namespace + "/" + topic;
     }
+
+    @Override
+    public PulsarComponent getComponent() {
+        return (PulsarComponent) super.getComponent();
+    }
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index 9e0398f..55249fc 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.pulsar;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.pulsar.client.api.Consumer;
@@ -45,8 +46,14 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
         final Exchange exchange = PulsarMessageUtils.updateExchange(message, endpoint.createExchange());
 
         try {
-            processor.process(exchange);
-            consumer.acknowledge(message.getMessageId());
+            if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
+                exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
+                        endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
+                processor.process(exchange);
+            } else {
+                processor.process(exchange);
+                consumer.acknowledge(message.getMessageId());
+            }
         } catch (Exception exception) {
             handleProcessorException(exchange, exception);
         }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
similarity index 64%
copy from components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
copy to components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
index f8b3267..8f389ca 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
@@ -14,17 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.pulsar.utils.message;
-
-public interface PulsarMessageHeaders {
-
-    String PROPERTIES = "properties";
-    String PRODUCER_NAME = "producer_name";
-    String SEQUENCE_ID = "sequence_id";
-    String PUBLISH_TIME = "publish_time";
-    String MESSAGE_ID = "message_id";
-    String EVENT_TIME = "event_time";
-    String KEY = "key";
-    String KEY_BYTES = "key_bytes";
-    String TOPIC_NAME = "topic_name";
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public interface PulsarMessageReceipt {
+
+    void acknowledge() throws PulsarClientException;
+
+    void acknowledgeCumulative() throws PulsarClientException;
+
+    CompletableFuture<Void> acknowledgeAsync();
+
+    CompletableFuture<Void> acknowledgeCumulativeAsync();
+
+    void negativeAcknowledge();
+
 }
+
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
similarity index 64%
copy from components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
copy to components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
index f8b3267..dcaed59 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
@@ -14,17 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.pulsar.utils.message;
+package org.apache.camel.component.pulsar;
 
-public interface PulsarMessageHeaders {
+import org.apache.camel.Exchange;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+
+public interface PulsarMessageReceiptFactory {
+
+    PulsarMessageReceipt newInstance(Exchange exchange, Message message, Consumer consumer);
 
-    String PROPERTIES = "properties";
-    String PRODUCER_NAME = "producer_name";
-    String SEQUENCE_ID = "sequence_id";
-    String PUBLISH_TIME = "publish_time";
-    String MESSAGE_ID = "message_id";
-    String EVENT_TIME = "event_time";
-    String KEY = "key";
-    String KEY_BYTES = "key_bytes";
-    String TOPIC_NAME = "topic_name";
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 8834164..6b160fd 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -39,6 +39,12 @@ public class PulsarConfiguration {
     private String producerName = "default-producer";
     @UriParam(label = "consumer", defaultValue = "cons")
     private String consumerNamePrefix = "cons";
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean allowManualAcknowledgement;
+    @UriParam(label = "consumer", defaultValue = "10000")
+    private long ackTimeoutMillis = 10000;
+    @UriParam(label = "consumer", defaultValue = "100")
+    private long ackGroupTimeMillis = 100;
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -116,4 +122,41 @@ public class PulsarConfiguration {
     public void setConsumerNamePrefix(String consumerNamePrefix) {
         this.consumerNamePrefix = consumerNamePrefix;
     }
+
+    public boolean isAllowManualAcknowledgement() {
+        return allowManualAcknowledgement;
+    }
+
+    /**
+     * Whether to allow manual message acknowledgements.
+     * <p/>
+     * If this option is enabled, then messages are not immediately acknowledged after being consumed.
+     * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
+     * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+     */
+    public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
+        this.allowManualAcknowledgement = allowManualAcknowledgement;
+    }
+
+    public long getAckTimeoutMillis() {
+        return ackTimeoutMillis;
+    }
+
+    /**
+     * Timeout for unacknowledged messages in milliseconds - defaults to 10000
+     */
+    public void setAckTimeoutMillis(long ackTimeoutMillis) {
+        this.ackTimeoutMillis = ackTimeoutMillis;
+    }
+
+    public long getAckGroupTimeMillis() {
+        return ackGroupTimeMillis;
+    }
+
+    /**
+     * Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100
+     */
+    public void setAckGroupTimeMillis(long ackGroupTimeMillis) {
+        this.ackGroupTimeMillis = ackGroupTimeMillis;
+    }
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index 740ee76..a96e369 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.pulsar.utils.consumers;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.camel.component.pulsar.PulsarConsumer;
 import org.apache.camel.component.pulsar.PulsarEndpoint;
 import org.apache.camel.component.pulsar.PulsarMessageListener;
@@ -32,6 +33,8 @@ public final class CommonCreationStrategyImpl {
 
         return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
             .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name)
+            .ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS)
+            .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS)
             .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
     }
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
index f8b3267..a983564 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
@@ -27,4 +27,5 @@ public interface PulsarMessageHeaders {
     String KEY = "key";
     String KEY_BYTES = "key_bytes";
     String TOPIC_NAME = "topic_name";
+    String MESSAGE_RECEIPT = "message_receipt";
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
index 9d46cec..258df10 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
@@ -63,6 +63,7 @@ public class PulsarComponentTest extends CamelTestSupport {
         assertEquals("default-producer", endpoint.getPulsarConfiguration().getProducerName());
         assertEquals("subs", endpoint.getPulsarConfiguration().getSubscriptionName());
         assertEquals(SubscriptionType.EXCLUSIVE, endpoint.getPulsarConfiguration().getSubscriptionType());
+        assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement());
     }
 
     @Test
@@ -75,4 +76,17 @@ public class PulsarComponentTest extends CamelTestSupport {
 
         verify(autoConfiguration).ensureNameSpaceAndTenant(Matchers.anyString());
     }
+
+    @Test
+    public void testPulsarEndpointAllowManualAcknowledgementDefaultTrue() throws Exception {
+        PulsarComponent component = new PulsarComponent(context);
+        component.setAllowManualAcknowledgement(true);
+
+        // allowManualAcknowledgement is absent as a query parameter.
+        PulsarEndpoint endpoint = (PulsarEndpoint)component.createEndpoint("pulsar://persistent/test/foobar/BatchCreated");
+
+        assertNotNull(endpoint);
+        assertTrue(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement());
+    }
+
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
new file mode 100644
index 0000000..5f45684
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerAcknowledgementTest.class);
+
+    private static final String TOPIC_URI = "persistent://public/default/camel-topic";
+    private static final String PRODUCER = "camel-producer-1";
+
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI
+            + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&allowManualAcknowledgement=true"
+            + "&ackTimeoutMillis=1000"
+    )
+    private Endpoint from;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint to;
+
+    Producer<String> producer;
+
+    @Before
+    public void setup() throws Exception {
+        context.removeRoute("myRoute");
+        producer = givenPulsarClient().newProducer(Schema.STRING)
+                .producerName(PRODUCER)
+                .topic(TOPIC_URI)
+                .create();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        registerPulsarBeans(jndi);
+
+        return jndi;
+    }
+
+    private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        jndi.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        jndi.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl()
+                .serviceUrl(getPulsarBrokerUrl())
+                .ioThreads(1)
+                .listenerThreads(1)
+                .build();
+    }
+
+    @Test
+    public void testAcknowledge() throws Exception {
+        to.expectsNoDuplicates(body());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    receipt.acknowledge();
+                });
+            }
+        });
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+
+    @Test
+    public void testAcknowledgeAsync() throws Exception {
+        to.expectsNoDuplicates(body());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    try {
+                        CompletableFuture<Void> f = receipt.acknowledgeAsync();
+                        f.get();
+                    } catch (Exception e) {
+                        LOGGER.error(e.getMessage());
+                    }
+                });
+            }
+        });
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+
+    @Test
+    public void testAcknowledgeCumulative() throws Exception {
+        to.expectsNoDuplicates(body());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    // Ack the second message. The first will also be acked.
+                    if (exchange.getIn().getBody().equals("Hello World Again!")) {
+                        receipt.acknowledgeCumulative();
+                    }
+                });
+            }
+        });
+
+        producer.send("Hello World!");
+        producer.send("Hello World Again!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+
+    @Test
+    public void testAcknowledgeCumulativeAsync() throws Exception {
+        to.expectsNoDuplicates(body());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    // Ack the second message. The first will also be acked.
+                    if (exchange.getIn().getBody().equals("Hello World Again!")) {
+                        try {
+                            CompletableFuture<Void> f = receipt.acknowledgeCumulativeAsync();
+                            f.get();
+                        } catch (Exception e) {
+                            LOGGER.error(e.getMessage());
+                        }
+                    }
+                });
+            }
+        });
+
+        producer.send("Hello World!");
+        producer.send("Hello World Again!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
new file mode 100644
index 0000000..6f68fc1
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+
+public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
+
+    private static final String TOPIC_URI = "persistent://public/default/camel-topic";
+    private static final String PRODUCER = "camel-producer-1";
+
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI
+            + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&ackTimeoutMillis=1000"
+    )
+    private Endpoint from;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint to;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // Nothing in the route will ack the message.
+                from(from).to(to);
+            }
+        };
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        registerPulsarBeans(jndi);
+
+        return jndi;
+    }
+
+    private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        jndi.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        comp.setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+        jndi.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl()
+                .serviceUrl(getPulsarBrokerUrl())
+                .ioThreads(1)
+                .listenerThreads(1)
+                .build();
+    }
+
+    @Test
+    public void testAMessageIsConsumedMultipleTimes() throws Exception {
+        to.expectedMinimumMessageCount(2);
+
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING)
+                .producerName(PRODUCER)
+                .topic(TOPIC_URI)
+                .create();
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+
+}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
new file mode 100644
index 0000000..e579b59
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarCustomMessageReceiptTest.class);
+
+    private static final String TOPIC_URI = "persistent://public/default/camel-topic";
+    private static final String PRODUCER = "camel-producer-1";
+
+    public PulsarMessageReceiptFactory mockPulsarMessageReceiptFactory = mock(PulsarMessageReceiptFactory.class);
+
+    public PulsarMessageReceipt mockPulsarMessageReceipt = mock(PulsarMessageReceipt.class);
+
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI
+            + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&allowManualAcknowledgement=true"
+            + "&ackTimeoutMillis=1000"
+    )
+    private Endpoint from;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint to;
+
+    Producer<String> producer;
+
+    @Before
+    public void setup() throws Exception {
+        producer = givenPulsarClient().newProducer(Schema.STRING)
+                .producerName(PRODUCER)
+                .topic(TOPIC_URI)
+                .create();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        registerPulsarBeans(jndi);
+
+        return jndi;
+    }
+
+    private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        jndi.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        // Test adding a custom PulsarMessageReceiptFactory
+        comp.setPulsarMessageReceiptFactory(mockPulsarMessageReceiptFactory);
+        jndi.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl()
+                .serviceUrl(getPulsarBrokerUrl())
+                .ioThreads(1)
+                .listenerThreads(1)
+                .build();
+    }
+
+    @Test
+    public void testAcknowledgeWithCustomMessageReceipt() throws Exception {
+        to.expectedMinimumMessageCount(2);
+
+        when(mockPulsarMessageReceiptFactory.newInstance(any(), any(), any())).thenReturn(mockPulsarMessageReceipt);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    receipt.acknowledge();
+                });
+            }
+        });
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+
+        // The mock does not actually acknowledge using the pulsar consumer, so the message will be re-consumed
+        // after the ackTimeout.
+        verify(mockPulsarMessageReceipt, atLeast(2)).acknowledge();
+        verifyNoMoreInteractions(mockPulsarMessageReceipt);
+    }
+    
+}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
similarity index 60%
copy from components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
copy to components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
index f8b3267..ebee09e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
@@ -14,17 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.pulsar.utils.message;
+package org.apache.camel.component.pulsar;
 
-public interface PulsarMessageHeaders {
+import static org.mockito.Mockito.mock;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.junit.Test;
+
+public class PulsarNegativeAcknowledgementTest {
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testNegativeAcknowledgement() {
+        PulsarMessageReceipt receipt = new DefaultPulsarMessageReceipt(mock(Consumer.class), mock(MessageId.class));
+        receipt.negativeAcknowledge();
+    }
 
-    String PROPERTIES = "properties";
-    String PRODUCER_NAME = "producer_name";
-    String SEQUENCE_ID = "sequence_id";
-    String PUBLISH_TIME = "publish_time";
-    String MESSAGE_ID = "message_id";
-    String EVENT_TIME = "event_time";
-    String KEY = "key";
-    String KEY_BYTES = "key_bytes";
-    String TOPIC_NAME = "topic_name";
 }
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 8368bce..82f0232 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -42,6 +42,92 @@ public interface PulsarEndpointBuilderFactory {
             return (AdvancedPulsarEndpointConsumerBuilder) this;
         }
         /**
+         * Group the consumer acknowledgments for the specified time in
+         * milliseconds - defaults to 100.
+         * 
+         * The option is a: <code>long</code> type.
+         * 
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder ackGroupTimeMillis(
+                long ackGroupTimeMillis) {
+            setProperty("ackGroupTimeMillis", ackGroupTimeMillis);
+            return this;
+        }
+        /**
+         * Group the consumer acknowledgments for the specified time in
+         * milliseconds - defaults to 100.
+         * 
+         * The option will be converted to a <code>long</code> type.
+         * 
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder ackGroupTimeMillis(
+                String ackGroupTimeMillis) {
+            setProperty("ackGroupTimeMillis", ackGroupTimeMillis);
+            return this;
+        }
+        /**
+         * Timeout for unacknowledged messages in milliseconds - defaults to
+         * 10000.
+         * 
+         * The option is a: <code>long</code> type.
+         * 
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder ackTimeoutMillis(
+                long ackTimeoutMillis) {
+            setProperty("ackTimeoutMillis", ackTimeoutMillis);
+            return this;
+        }
+        /**
+         * Timeout for unacknowledged messages in milliseconds - defaults to
+         * 10000.
+         * 
+         * The option will be converted to a <code>long</code> type.
+         * 
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder ackTimeoutMillis(
+                String ackTimeoutMillis) {
+            setProperty("ackTimeoutMillis", ackTimeoutMillis);
+            return this;
+        }
+        /**
+         * Whether to allow manual message acknowledgements. If this option is
+         * enabled, then messages are not immediately acknowledged after being
+         * consumed. Instead, an instance of PulsarMessageReceipt is stored as a
+         * header on the org.apache.camel.Exchange. Messages can then be
+         * acknowledged using PulsarMessageReceipt at any time before the
+         * ackTimeout occurs.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder allowManualAcknowledgement(
+                boolean allowManualAcknowledgement) {
+            setProperty("allowManualAcknowledgement", allowManualAcknowledgement);
+            return this;
+        }
+        /**
+         * Whether to allow manual message acknowledgements. If this option is
+         * enabled, then messages are not immediately acknowledged after being
+         * consumed. Instead, an instance of PulsarMessageReceipt is stored as a
+         * header on the org.apache.camel.Exchange. Messages can then be
+         * acknowledged using PulsarMessageReceipt at any time before the
+         * ackTimeout occurs.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder allowManualAcknowledgement(
+                String allowManualAcknowledgement) {
+            setProperty("allowManualAcknowledgement", allowManualAcknowledgement);
+            return this;
+        }
+        /**
          * Allows for bridging the consumer to the Camel routing Error Handler,
          * which mean any exceptions occurred while the consumer is trying to
          * pickup incoming messages, or the likes, will now be processed as a
diff --git a/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java
index c36f30a..c34dc8d 100644
--- a/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java
@@ -47,6 +47,21 @@ public class PulsarComponentConfiguration
      */
     private String pulsarClient;
     /**
+     * Whether to allow manual message acknowledgements. If this option is
+     * enabled, then messages are not immediately acknowledged after being
+     * consumed. Instead, an instance of PulsarMessageReceipt is stored as a
+     * header on the org.apache.camel.Exchange. Messages can then be
+     * acknowledged using PulsarMessageReceipt at any time before the ackTimeout
+     * occurs.
+     */
+    private Boolean allowManualAcknowledgement = false;
+    /**
+     * Provide a factory to create an alternate implementation of
+     * PulsarMessageReceipt. The option is a
+     * org.apache.camel.component.pulsar.PulsarMessageReceiptFactory type.
+     */
+    private String pulsarMessageReceiptFactory;
+    /**
      * Whether the component should resolve property placeholders on itself when
      * starting. Only properties which are of String type can use property
      * placeholders.
@@ -74,6 +89,23 @@ public class PulsarComponentConfiguration
         this.pulsarClient = pulsarClient;
     }
 
+    public Boolean getAllowManualAcknowledgement() {
+        return allowManualAcknowledgement;
+    }
+
+    public void setAllowManualAcknowledgement(Boolean allowManualAcknowledgement) {
+        this.allowManualAcknowledgement = allowManualAcknowledgement;
+    }
+
+    public String getPulsarMessageReceiptFactory() {
+        return pulsarMessageReceiptFactory;
+    }
+
+    public void setPulsarMessageReceiptFactory(
+            String pulsarMessageReceiptFactory) {
+        this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory;
+    }
+
     public Boolean getResolvePropertyPlaceholders() {
         return resolvePropertyPlaceholders;
     }